Powiedz, że mam Observable
tak:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Następnie mam sekundę Observable
:
var two = someOtherObservable.take(1);
Teraz chcę subscribe()
się two
, ale chcę, aby upewnić się, że one
został zakończony przed two
abonent zostaje zwolniony.
Jakiej metody buforowania mogę użyć, two
aby druga z nich czekała na zakończenie pierwszej?
Przypuszczam, że chcę wstrzymać, two
aż one
się skończy.
javascript
observable
rxjs
Stephen
źródło
źródło
Odpowiedzi:
Przychodzi mi do głowy kilka sposobów
import {take, publish} from 'rxjs/operators' import {concat} from 'rxjs' //Method one var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1)); concat(one, two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1), publish()); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
źródło
pause
iresume
zamiastpublish
iconnect
, ale przykład drugi jest zasadniczo drogą, którą wybrałem.one
) rozwiąże się przed drugą (two
) wewnątrz funkcji subscribe ()?Observable.forkJoin()
? Zobacz ten link learnrxjs.io/operators/combination/forkjoin.htmlforkJoin
subskrybuje jednocześnie.Jeśli chcesz mieć pewność, że kolejność wykonywania zostanie zachowana, możesz użyć flatMap jako poniższego przykładu
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log('finished'));
Wynik byłby taki:
"1" "11" "111" "finished"
źródło
skipUntil () z last ()
skipUntil: ignoruj emitowane elementy, dopóki nie wyemituje inny obserwowalny
last: emituje ostatnią wartość z sekwencji (tj. poczekaj, aż się zakończy, a następnie wyemituj)
Zauważ, że cokolwiek wyemitowane z tego, co obserwowalne przekazano do
skipUntil
, anuluje pomijanie, dlatego musimy dodaćlast()
- aby poczekać na zakończenie strumienia.Oficjalny: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Możliwy problem: Zauważ, że
last()
sam z siebie wystąpi błąd, jeśli nic nie zostanie wyemitowane.last()
Operator madefault
parametr, ale tylko w połączeniu z predykatu. Myślę, że jeśli ta sytuacja jest dla Ciebie problemem (jeślisequence2$
może zakończyć się bez emisji), to jeden z nich powinien działać (obecnie niesprawdzony):main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last())) main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Zauważ, że
undefined
jest to prawidłowy element do wyemitowania, ale w rzeczywistości może to być dowolna wartość. Należy również pamiętać, że jest to rura przymocowana do rury,sequence2$
a niemain$
rura.źródło
Oto kolejna możliwość skorzystania z selektora wyników switchMap
var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we're actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });
Ponieważ selektor wyników switchMap został zdeprecjonowany, oto zaktualizowana wersja
const one$ = someObservable.pipe(take(1)); const two$ = someOtherObservable.pipe( take(1), switchMap(value2 => one$.map(_ => value2)) ); two$.subscribe(value2 => { /* do something */ });
źródło
Oto sposób wielokrotnego użytku (jest to maszynopis, ale możesz go dostosować do js):
export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }
i możesz go używać jak każdego operatora:
var two = someOtherObservable.pipe(waitFor(one), take(1));
Zasadniczo jest to operator, który odracza subskrypcję na obserwowalnym źródle, dopóki obserwowalny sygnał nie wyemituje pierwszego zdarzenia.
źródło
Jeśli druga obserwowalna jest gorąca , istnieje inny sposób na wstrzymanie / wznowienie :
var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });
Możesz również użyć wersji buforowanej pausableBuffered, aby zachować dane podczas wstrzymania.
źródło
Oto jeszcze jedno, ale czuję się prostsze i bardziej intuicyjne (lub przynajmniej naturalne, jeśli jesteś przyzwyczajony do obietnic). Zasadniczo tworzysz Observable używając
Observable.create()
do zawijaniaone
itwo
jako pojedynczy Observable. Jest to bardzo podobne do tego, jakPromise.all()
może działać.var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });
Więc co się tutaj dzieje? Najpierw tworzymy nowy Observable. Funkcja przekazana do
Observable.create()
, trafnie nazwanaonSubscription
, jest przekazywana obserwatorowi (zbudowanemu z parametrów, do których przekazujeszsubscribe()
), który jest podobnyresolve
ireject
łączy się w pojedynczy obiekt podczas tworzenia nowej Obietnicy. W ten sposób sprawiamy, że magia działa.W programie
onSubscription
subskrybujemy pierwszy Observable (w powyższym przykładzie nazywało się toone
). To, jak sobie z tym poradzimynext
ierror
zależy od Ciebie, ale ustawienie domyślne podane w mojej próbce powinno być ogólnie odpowiednie. Jednak kiedy otrzymamycomplete
zdarzenie, które oznacza, żeone
jest już zrobione, możemy zapisać się do następnego Observable; strzelając w ten sposób z drugiego Observable po zakończeniu pierwszego.Przykładowy obserwator podany dla drugiego Obserwowalnego jest dość prosty. Zasadniczo,
second
teraz zachowuje się tak, jak można by się spodziewaćtwo
w PO. Dokładniej,second
wyemituje pierwszą i jedyną pierwszą wartość wyemitowaną przezsomeOtherObservable
(z powodutake(1)
), a następnie zakończoną, zakładając, że nie ma błędu.Przykład
Oto pełny, działający przykład, który możesz skopiować / wkleić, jeśli chcesz zobaczyć, jak mój przykład działa w prawdziwym życiu:
var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );
Jeśli oglądasz konsolę, powyższy przykład wyświetli:
źródło
Oto niestandardowy operator napisany w TypeScript, który czeka na sygnał przed wyemitowaniem wyników:
export function waitFor<T>( signal$: Observable<any> ) { return (source$: Observable<T>) => new Observable<T>(observer => { // combineLatest emits the first value only when // both source and signal emitted at least once combineLatest([ source$, signal$.pipe( first(), ), ]) .subscribe(([v]) => observer.next(v)); }); }
Możesz go używać w ten sposób:
two.pipe(waitFor(one)) .subscribe(value => ...);
źródło
cóż, wiem, że to jest dość stare, ale myślę, że możesz potrzebować:
var one = someObservable.take(1); var two = someOtherObservable.pipe( concatMap((twoRes) => one.pipe(mapTo(twoRes))), take(1) ).subscribe((twoRes) => { // one is completed and we get two's subscription. })
źródło
Możesz użyć wyniku emitowanego z poprzedniego Observable dzięki operatorowi mergeMap (lub jego aliasowi flatMap ) w następujący sposób:
const one = Observable.of('https://api.github.com/users'); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))
źródło