RxJS: Jak „ręcznie” zaktualizować Observable?

139

Myślę, że muszę źle rozumieć coś fundamentalnego, ponieważ moim zdaniem to powinien być najbardziej podstawowy przypadek dla obserwowalnego, ale dla mojego życia nie mogę dowiedzieć się, jak to zrobić z dokumentacji.

Zasadniczo chcę móc to zrobić:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Ale nie udało mi się znaleźć takiej metody push. Używam tego do obsługi kliknięć i wiem, że mają Observable.fromEventdo tego, ale próbuję go używać z Reactem i wolałbym móc po prostu zaktualizować strumień danych w wywołaniu zwrotnym, zamiast używać zupełnie innego system obsługi zdarzeń. Zasadniczo chcę tego:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Najbliższy, jaki uzyskałem, był używany observer.onNext('foo'), ale to nie wydawało się działać i to jest wezwane przez obserwatora, co nie wydaje się właściwe. Obserwator powinien reagować na strumień danych, a nie go zmieniać, prawda?

Czy po prostu nie rozumiem relacji obserwator / obserwowalny?

LiamD
źródło
Przyjrzyj się temu, aby wyjaśnić swój pomysł (brakowało Ci wprowadzenia do programowania reaktywnego): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Tutaj również jest kilka zasobów, z których możesz poprawić swoje zrozumienie: github.com/Reactive-Extensions/RxJS#resources
user3743222
Sprawdziłem pierwszy, wydaje się, że jest to solidny zasób. Druga to świetna lista, na której znalazłem aaronstacy.com/writings/reactive-programming-and-mvc, co pomogło mi odkryć Rx.Subject, który rozwiązuje mój problem. Więc dziękuję! Kiedy napiszę trochę więcej aplikacji, opublikuję moje rozwiązanie, po prostu chcę trochę przetestować.
LiamD,
Hehe, bardzo dziękuję za zadanie tego pytania, właśnie miałem zadać to samo pytanie, mając na uwadze ten sam przykład kodu :-)
arturh

Odpowiedzi:

154

W RX, Observer i Observable to odrębne byty. Obserwator subskrybuje Observable. Observable emituje pozycje do swoich obserwatorów, wywołując metody obserwatorów. Jeśli potrzebujesz wywołać metody obserwatora poza Observable.create()twoim zakresem , możesz użyć Subject, który jest proxy, który działa jednocześnie jako obserwator i Observable.

Możesz zrobić tak:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Więcej informacji na temat tematów można znaleźć tutaj:

luisgabriel
źródło
1
To jest dokładnie to, co zrobiłem! Pracowałem nad tym, aby zobaczyć, czy mógłbym znaleźć lepszy sposób na zrobienie tego, co potrzebowałem, ale jest to zdecydowanie wykonalne rozwiązanie. Po raz pierwszy zobaczyłem to w tym artykule: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
34

Uważam, Observable.create()że nie przyjmuje obserwatora jako parametru wywołania zwrotnego, ale emiter. Więc jeśli chcesz dodać nową wartość do swojego Observable, spróbuj zamiast tego:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Tak Temat ułatwia, zapewniając Observable i Observer w tym samym obiekcie, ale nie jest to dokładnie to samo, ponieważ Subject umożliwia przypisanie wielu obserwatorów do tego samego obserwowalnego, gdy obserwowalny wysyła dane tylko do ostatniego subskrybowanego obserwatora, więc używaj go świadomie . Oto JsBin, jeśli chcesz przy nim majstrować.

theFreedomBanana
źródło
czy możliwość nadpisania właściwości emitera jest udokumentowana gdzieś w instrukcjach RxJS?
Tomas
W tym przypadku emittertylko next()nowe wartości dla obserwatora, który subskrybował ostatni. Lepszym podejściem byłoby zebranie wszystkich emitters w tablicy i iterowanie po nich wszystkich oraz nextwartości na każdym z nich
Eric Gopak