Rxjs: Observable.combineLatest vs Observable.forkJoin

86

Zastanawiam się tylko, jakie są różnice między Observable.combineLatesti Observable.forkJoin? O ile widzę, jedyną różnicą jest to, forkJoinże oczekuje się, że Observables zostaną zakończone, a combineLatestzwrócą najnowsze wartości.

Tuong Le
źródło
4
Uwaga: W rxjs6 + są to teraz tylko combineLatest()i forkJoin()funkcje, które tworzą obserwowalne. Robią to samo, ale składnia jest inna. Nie należy mylić, combineLatestod rxjs/operatorsktórego jest operatorem „potokowalnym”. Jeśli zaimportujesz niewłaściwy, otrzymasz błędy.
Simon_Weaver,

Odpowiedzi:

129

Nie tylko forkJoinwymaga uzupełnienia wszystkich obserwowalnych wejściowych, ale także zwraca obserwowalną, która generuje pojedynczą wartość, która jest tablicą ostatnich wartości utworzonych przez obserwable wejściowe. Innymi słowy, czeka, aż ostatnie obserwowalne dane wejściowe zostaną zakończone, a następnie wygeneruje pojedynczą wartość i zakończy.

W przeciwieństwie do tego combineLatestzwraca Observable, który tworzy nową wartość za każdym razem, gdy robią to obserwable wejściowe, gdy wszystkie obserwable wejściowe dały co najmniej jedną wartość. Oznacza to, że może mieć nieskończone wartości i może się nie kończyć. Oznacza to również, że obserwable wejściowe nie muszą kończyć się przed utworzeniem wartości.

GregL
źródło
@GregL Czy istnieje funkcja, która działa jak forkJoin, ale działałaby również z nieudanymi wywołaniami HTTP?
Tukkan,
2
@Tukkan, połączyłbym operatora, który odzyskuje z błędów, do każdego obserwowalnego http, tak abyś zdefiniował wartość (y) do użycia dla każdego żądania w przypadku błędu, zamiast szukać operatora, który połączy wiele obserwacji, które mogą powodować błąd (ja nie jestem pewien, czy taki operator istnieje). Operatory, które odzyskują po błędach, obejmują .catch(), .onErrorResumeNext()i prawdopodobnie .retry()(jeśli wywołanie HTTP może sporadycznie kończyć się niepowodzeniem).
GregL
1
Żeby wyjaśnić, obaj tworzą tablice.
Simon_Weaver,
@Simon_Weaver Niekoniecznie. W przypadku programu combineLatest()można podać funkcję projekcji, aby określić, w jaki sposób utworzyć wartość wyjściową na podstawie najnowszych wartości z danych wejściowych. Domyślnie, jeśli nie określisz żadnego, otrzymasz tablicę ostatnich wyemitowanych wartości.
GregL,
Każda emisja z connectLatest jest tablicą zebranych wartości. Chciałem to tylko dodać, ponieważ wspomniałeś tylko o tablicy dla forkjoin. W ten sposób są takie same. I tak, w RxJS zawsze są subtelności, więc jeśli coś przegapiłem, czy możesz wyjaśnić, co masz na myśli.
Simon_Weaver,
14

Również:

connectLatest (...) uruchamia obserable po kolei, jeden po drugim

combineLatestużywa wewnętrznie concat, co oznacza, że ​​wartość musi zostać uzyskana dla każdego obserwowalnego w tablicy przed przejściem do następnej.

// partial source of static combineLatest (uses the rxjs/operators combineLatest internally):
// If you're using typescript then the output array will be strongly typed based on type inference
return function (source) { return source.lift.call(from_1.from([source].concat(observables)), 
                           new combineLatest_1.CombineLatestOperator(project)); };

forkJoin (...) uruchamia obserwable równolegle, w tym samym czasie

Jeśli masz 3 obserwowalne źródła, a każde z nich trwa 5 sekund, uruchomienie zajmie 15 sekund combineLatest. Podczas forkJoingdy działają równolegle, więc zajmie to 5 sekund.

forkJoinDziała więc trochę bardziej jak Promise.all(...)tam, gdzie nakaz nie jest egzekwowany.

Uwagi dotyczące obsługi błędów:

Jeśli którykolwiek z obserwabli się nie powiedzie - combineLatestkolejne nie zostaną wykonane, ale forkJoinwszystkie działają. combineLatestMoże więc być przydatne do wykonania „sekwencji” obserwabli i zebrania wszystkich wyników.


Uwaga dla zaawansowanych: Jeśli obserwable źródłowe są już „uruchomione” (subskrybowane przez coś innego) i używasz sharena nich - nie zobaczysz tego zachowania.

Jeszcze bardziej zaawansowana uwaga: CombineLatest zawsze podaje najnowsze dane z każdego źródła, więc jeśli jeden z obserwowalnych źródeł emituje wiele wartości, otrzymasz najnowsze. Nie tylko pobiera jedną wartość dla każdego źródła i przechodzi do następnego. Jeśli chcesz mieć pewność, że otrzymujesz tylko „następny dostępny element” dla każdego obserwowalnego źródła, możesz dodać .pipe(take(1))go do źródła obserwowalnego, gdy dodajesz je do tablicy wejściowej.

Simon_Weaver
źródło
Dziękujemy za uwagi dotyczące obsługi błędów. Obsługa błędów jest dość trudna do zrozumienia w przypadku wielu obserwacji.
D Deshmane
2
Uważam, że źle zinterpretowałeś, co robi concat()w combineLatest()kodzie. Wydaje mi się, że to Array.prototype.concatmetoda, a nie concatmetoda RxJS . Zakładając, że mam rację, odpowiedź ta jest myląca i niepoprawna, ponieważ combineLatest()nie wykonuje obserwacji po kolei. Wykonuje je równolegle, tak samo jak forkJoin(). Różnica polega na tym, ile wartości jest generowanych i czy obserwable źródłowe muszą się zakończyć, czy nie.
GregL
@GregL Array.concat jest bez znaczenia w przypadku obserwabli. Całkowicie polegam na sekwencyjnej naturze concat - kluczową rzeczą do zrozumienia jest to, że nie otrzymujesz żadnych wartości wyjściowych, dopóki wszystkie nie zostaną wykonane.
Simon_Weaver
Odnosiłem się do tego kodu w przykładowym kodzie źródłowym, który opublikowałeś: [source].concat(observables)zakładając, że właśnie to miałeś na myśli, mówiąc, że „ combineLatestużywa wewnętrznie concat”. Wydaje mi się, że mylisz konkatowanie tablic z konkatacją RxJS. Ta ostatnia rzeczywiście wykonuje obserwowalne wejściowe po kolei, czekając, aż każda z nich zostanie ukończona. Ale to nie jest używane przez combineLatest(), to jest całkowicie oddzielny operator.
GregL
3
Wierzę combineLatest()i forkJoin()obie działają równolegle. Uruchomienie poniższego kodu daje około 5000 dla obu. const start = new Date().getTime(); combineLatest([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start)); forkJoin([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start));
Jeremy
13

forkJoin - Po zakończeniu wszystkich obserwabli, wyemituj ostatnią wyemitowaną wartość z każdego.

connectLatest - gdy jakikolwiek obserwowalny emituje wartość, emituje najnowszą wartość z każdego.

Sposób użycia jest dość podobny, ale nie należy zapominać o wypisaniu się z funkcji CombineLatest w przeciwieństwie do forkJoin .

Dmitry Grinko
źródło
1
jeśli jedno, jeśli żądanie nie powiedzie się, wszystkie zagnieżdżone żądania zostaną automatycznie anulowane, jak mogę to rozwiązać?
Sunil Garg
1
@SunilGarg Powinieneś użyć forkJoin lub connectLatest, jeśli chcesz uzyskać tylko wszystkie wyniki. Jeśli nie dbasz o wszystkie wyniki, powinieneś osobno korzystać z subskrypcji.
Dmitry Grinko
czy możesz odpowiedzieć na ten stackoverflow.com/questions/55558277/…
Sunil Garg
czy możesz wyjaśnić, dlaczego rezygnacja z subskrypcji jest wymagana za pomocą przykładu kodu
Sunil Garg