Jak mogę „czekać” na Rx Observable?

111

Chciałbym móc czekać na obserwowalny, np

const source = Rx.Observable.create(/* ... */)
//...
await source;

Naiwna próba skutkuje natychmiastowym rozstrzygnięciem oczekiwania i nie blokowaniem wykonania

Edycja: pseudokod dla mojego pełnego zamierzonego przypadku użycia to:

if (condition) {
  await observable;
}
// a bunch of other code

Rozumiem, że mogę przenieść drugi kod do innej oddzielnej funkcji i przekazać go do wywołania zwrotnego subskrypcji, ale mam nadzieję, że uda mi się tego uniknąć.

CheapSteaks
źródło
Czy nie możesz przenieść pozostałego kodu (który chcesz poczekać na źródło) do .subscribe()wywołania metody?
StriplingWarrior

Odpowiedzi:

133

Musisz przekazać obietnicę await. Zamień następne zdarzenie obserwowalnego na obietnicę i czekaj na to.

if (condition) {
  await observable.first().toPromise();
}

Uwaga dotycząca edycji: ta odpowiedź pierwotnie używała .take (1), ale została zmieniona na .first (), co pozwala uniknąć problemu z obietnicą, która nigdy nie została rozwiązana, jeśli strumień kończy się przed pojawieniem się wartości.

Macil
źródło
3
Zamiast take (1) mógłbyś użyć await observable.first().toPromise();?
apricity
15
@apricity Jeśli po zakończeniu nie ma żadnych wartości, first()spowoduje odrzucenie i take(1)oczekującą obietnicę.
Estus Flask
6
@apricity @AgentME Właściwie NIE powinieneś używać ani take(1)ani first()w takich przypadkach. Ponieważ spodziewasz się dokładnie JEDNEGO zdarzenia, powinieneś użyć, single()który zgłosi wyjątek, jeśli jest więcej niż 1, nie rzucając wyjątku, gdy go nie ma. Jeśli jest więcej niż jeden, prawdopodobnie coś jest nie tak w Twoim kodzie / modelu danych itp. Jeśli nie użyjesz pojedynczego, w końcu arbitralnie wybierzesz pierwszy element, który powróci bez ostrzeżenia, że ​​jest ich więcej. Musisz uważać w swoim predykacie na nadrzędnym źródle danych, aby zawsze zachować tę samą kolejność.
ntziolis
3
Nie zapomnij importować:import 'rxjs/add/operator/first';
Stephanie,
7
Teraz, gdy metoda toPromise () jest przestarzała, jak powinniśmy to zrobić?
Jus10
26

Prawdopodobnie tak musi być

await observable.first().toPromise();

Jak zauważono we wcześniejszych komentarzach, istnieje zasadnicza różnica między operatorami take(1)a first(), gdy występuje pusta wypełniona obserwowalna.

Observable.empty().first().toPromise()spowoduje odrzucenie, przy EmptyErrorczym można to odpowiednio potraktować, ponieważ tak naprawdę nie było wartości.

I Observable.empty().take(1).toPromise()spowoduje rozdzielczość z undefinedwartością.

Estus Flask
źródło
Faktycznie take(1)będzie nie uzyskując oczekującego obietnicy. Przyniesie obietnicę rozwiązaną undefined.
Johan t Hart
Dzięki, że zauważyłeś, zgadza się. Nie jestem pewien, dlaczego post się różnił, być może zachowanie zmieniło się w pewnym momencie.
Estus Flask
8

Będziesz potrzebował awaitobietnicy, więc będziesz chciał użyć toPromise(). Zobacz to, aby uzyskać więcej informacji toPromise().

Josh Durham
źródło
5

Jeśli toPromisejest przestarzały, możesz go użyć, .pipe(take(1)).toPromiseale jak widać tutaj, nie jest on wycofany.

Więc proszę, po toPromiseprostu użyj (RxJs 6), jak powiedziano:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

przykład async / await:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Przeczytaj więcej tutaj .

I proszę usuń to błędne twierdzenie, że toPromisejest przestarzałe.

Emerica
źródło
1

Użyj nowego firstValueFrom()lub lastValueFrom()zamiast toPromise(), które, jak wskazano tutaj, jest przestarzałe począwszy od RxJS 7 i zostanie usunięte w RxJS 8.

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

Jest to dostępne w RxJS 7+

Zobacz: https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

TetraDev
źródło