Jak sprawić, by jedna obserwowalna sekwencja czekała na zakończenie innej przed wyemitowaniem?

86

Powiedz, że mam Observabletak:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Następnie mam sekundę Observable:

var two = someOtherObservable.take(1);

Teraz chcę subscribe()się two, ale chcę, aby upewnić się, że onezostał zakończony przed twoabonent zostaje zwolniony.

Jakiej metody buforowania mogę użyć, twoaby druga z nich czekała na zakończenie pierwszej?

Przypuszczam, że chcę wstrzymać, twoonesię skończy.

Stephen
źródło
1
Uważam, że odpowiedzią na to jest metoda .exhaustMap (), jednak nie udawałbym, że wiem, jak ją zaimplementować - pełny opis tutaj: blog.angular-university.io/rxjs-higher-order-mapping
Peter Nixey

Odpowiedzi:

53

Przychodzi mi do głowy kilka sposobów

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
paulpdaniels
źródło
1
Skończyło się na używaniu pausei resumezamiast publishi connect, ale przykład drugi jest zasadniczo drogą, którą wybrałem.
Stephen
1
Czy ta metoda zawsze sprawi, że pierwsza obserowalna ( one) rozwiąże się przed drugą ( two) wewnątrz funkcji subscribe ()?
John
Dlaczego nie używać Observable.forkJoin()? Zobacz ten link learnrxjs.io/operators/combination/forkjoin.html
mspasiuk
16
@mspasiuk zgodnie z wymaganiami PO, chcieli, aby druga osoba zapisała się dopiero po zakończeniu pierwszej. forkJoinsubskrybuje jednocześnie.
paulpdaniels
17

Jeśli chcesz mieć pewność, że kolejność wykonywania zostanie zachowana, możesz użyć flatMap jako poniższego przykładu

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

Wynik byłby taki:

"1"
"11"
"111"
"finished"
Nikos Tsokos
źródło
15

skipUntil () z last ()

skipUntil: ignoruj ​​emitowane elementy, dopóki nie wyemituje inny obserwowalny

last: emituje ostatnią wartość z sekwencji (tj. poczekaj, aż się zakończy, a następnie wyemituj)

Zauważ, że cokolwiek wyemitowane z tego, co obserwowalne przekazano do skipUntil, anuluje pomijanie, dlatego musimy dodać last()- aby poczekać na zakończenie strumienia.

main$.skipUntil(sequence2$.pipe(last()))

Oficjalny: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Możliwy problem: Zauważ, że last()sam z siebie wystąpi błąd, jeśli nic nie zostanie wyemitowane. last()Operator ma defaultparametr, ale tylko w połączeniu z predykatu. Myślę, że jeśli ta sytuacja jest dla Ciebie problemem (jeśli sequence2$może zakończyć się bez emisji), to jeden z nich powinien działać (obecnie niesprawdzony):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Zauważ, że undefinedjest to prawidłowy element do wyemitowania, ale w rzeczywistości może to być dowolna wartość. Należy również pamiętać, że jest to rura przymocowana do rury, sequence2$a nie main$rura.

Simon_Weaver
źródło
Bardzo niezdarne demo: angular-vgznak.stackblitz.io Musisz kliknąć, aby otworzyć tacę konsoli
Simon_Weaver
Twoja składnia jest nieprawidłowa. skipUntil nie może być bezpośrednio dołączone do obserwowalnego, w przeciwnym razie pojawi się następujący błąd: „Właściwość„ skipUntil ”nie istnieje w typie„ Observable <any> ”. Musisz najpierw uruchomić go przez .pipe ()
London804,
Tak, to stara odpowiedź, zanim potrzebna była potok. Dzięki, że o tym wspomniałeś. Zaktualizowałbym go teraz, ale rozmawiam przez telefon. Zapraszam do edycji odpowiedzi.
Simon_Weaver
13

Oto kolejna możliwość skorzystania z selektora wyników switchMap

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Ponieważ selektor wyników switchMap został zdeprecjonowany, oto zaktualizowana wersja

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});
Joe King
źródło
8

Oto sposób wielokrotnego użytku (jest to maszynopis, ale możesz go dostosować do js):

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

i możesz go używać jak każdego operatora:

var two = someOtherObservable.pipe(waitFor(one), take(1));

Zasadniczo jest to operator, który odracza subskrypcję na obserwowalnym źródle, dopóki obserwowalny sygnał nie wyemituje pierwszego zdarzenia.

Andrei Tătar
źródło
6

Jeśli druga obserwowalna jest gorąca , istnieje inny sposób na wstrzymanie / wznowienie :

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Możesz również użyć wersji buforowanej pausableBuffered, aby zachować dane podczas wstrzymania.

Anton
źródło
2

Oto jeszcze jedno, ale czuję się prostsze i bardziej intuicyjne (lub przynajmniej naturalne, jeśli jesteś przyzwyczajony do obietnic). Zasadniczo tworzysz Observable używając Observable.create()do zawijania onei twojako pojedynczy Observable. Jest to bardzo podobne do tego, jak Promise.all()może działać.

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

Więc co się tutaj dzieje? Najpierw tworzymy nowy Observable. Funkcja przekazana do Observable.create(), trafnie nazwana onSubscription, jest przekazywana obserwatorowi (zbudowanemu z parametrów, do których przekazujesz subscribe()), który jest podobny resolvei rejectłączy się w pojedynczy obiekt podczas tworzenia nowej Obietnicy. W ten sposób sprawiamy, że magia działa.

W programie onSubscriptionsubskrybujemy pierwszy Observable (w powyższym przykładzie nazywało się to one). To, jak sobie z tym poradzimy nexti errorzależy od Ciebie, ale ustawienie domyślne podane w mojej próbce powinno być ogólnie odpowiednie. Jednak kiedy otrzymamy completezdarzenie, które oznacza, że onejest już zrobione, możemy zapisać się do następnego Observable; strzelając w ten sposób z drugiego Observable po zakończeniu pierwszego.

Przykładowy obserwator podany dla drugiego Obserwowalnego jest dość prosty. Zasadniczo, secondteraz zachowuje się tak, jak można by się spodziewać twow PO. Dokładniej, secondwyemituje pierwszą i jedyną pierwszą wartość wyemitowaną przez someOtherObservable(z powodu take(1)), a następnie zakończoną, zakładając, że nie ma błędu.

Przykład

Oto pełny, działający przykład, który możesz skopiować / wkleić, jeśli chcesz zobaczyć, jak mój przykład działa w prawdziwym życiu:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

Jeśli oglądasz konsolę, powyższy przykład wyświetli:

1

6

Gotowe!

c1moore
źródło
To był przełom, którego potrzebowałem, aby stworzyć własny niestandardowy operator `` klastra (T, X, D) '', który przetwarza tylko pierwsze emisje X w przedziale czasowym T ze źródła i emituje wyniki z opóźnieniem D. Dziękuję Ci!
wonkim00
Cieszę się, że pomogło, to też było bardzo pouczające, kiedy to zrozumiałem.
c1moore
2

Oto niestandardowy operator napisany w TypeScript, który czeka na sygnał przed wyemitowaniem wyników:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}

Możesz go używać w ten sposób:

two.pipe(waitFor(one))
   .subscribe(value => ...);
Sergiu
źródło
1
ładny wzór! Możesz nawet zrobić trzy rury (waitFor (one), waitFor (two), take (1))
David Rinck
1

cóż, wiem, że to jest dość stare, ale myślę, że możesz potrzebować:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})
itay oded
źródło
0

Możesz użyć wyniku emitowanego z poprzedniego Observable dzięki operatorowi mergeMap (lub jego aliasowi flatMap ) w następujący sposób:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))
Tktorza
źródło
stąd: learnrxjs.io/learn-rxjs/operators/transformation/mergemap - "Jeśli kolejność emisji i subskrypcji wewnętrznych obserwabli jest ważna, wypróbuj concatMap!"
gsziszi