Patrzyłem na nową rx java 2 i nie jestem do końca pewien, czy rozumiem już ideę backpressure
...
Zdaję sobie sprawę, że my Observable
nie mamy backpressure
wsparcia i Flowable
to je ma.
Więc na podstawie przykładu powiedzmy, że mam flowable
z interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
To się zawiesza po około 128 wartościach i jest to całkiem oczywiste, że konsumuję wolniej niż zdobywanie przedmiotów.
Ale mamy to samo z Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
To wcale się nie zawiesi, nawet jeśli opóźnię spożycie, nadal działa. Aby Flowable
pracować, powiedzmy, że wstawię onBackpressureDrop
operator, awaria zniknęła, ale nie wszystkie wartości są również emitowane.
Więc podstawowe pytanie, na które nie mogę znaleźć odpowiedzi w mojej głowie, brzmi: dlaczego mam się przejmować, backpressure
kiedy mogę używać zwykłego Observable
nadal odbierać wszystkie wartości bez zarządzania buffer
? A może z drugiej strony, jakie korzyści backpressure
dają mi korzyści z zarządzania i obsługi konsumpcji?
Odpowiedzi:
To, co objawia się przeciwciśnienie w praktyce, to ograniczone bufory,
Flowable.observeOn
ma bufor złożony ze 128 elementów, który jest opróżniany tak szybko, jak może to przyjąć strumień zstępujący. Możesz zwiększyć ten rozmiar bufora indywidualnie, aby obsłużyć źródło wybuchowe, a wszystkie praktyki zarządzania ciśnieniem wstecznym nadal mają zastosowanie od 1.x.Observable.observeOn
ma nieograniczony bufor, który nieustannie zbiera elementy i aplikacji może zabraknąć pamięci.Możesz użyć
Observable
na przykład:Możesz użyć
Flowable
na przykład:źródło
Maybe
,Single
aCompletable
może zawsze być używane zamiastFlowable
gdy są semantycznie właściwe?Maybe
,Single
iCompletable
to zdecydowanie zbyt mała, aby mieć jakąkolwiek potrzebę pojęcia zwrotnym. Nie ma szans, by producent wyemitował przedmioty szybciej, niż mogą zostać zużyte, ponieważ 0–1 przedmiotów zostanie kiedykolwiek wyprodukowanych lub zużytych.Przeciwciśnienie występuje wtedy, gdy Twój obserwowalny (wydawca) tworzy więcej zdarzeń, niż może obsłużyć Twój subskrybent. Możesz więc uzyskać brakujące wydarzenia dla subskrybentów lub możesz uzyskać ogromną kolejkę wydarzeń, która ostatecznie prowadzi do braku pamięci.
Flowable
uwzględnia przeciwciśnienie.Observable
nie. Otóż to.przypomina mi lejek, który, gdy ma zbyt dużo płynu, przelewa się. Flowable może pomóc w uniknięciu tego:
z ogromnym przeciwciśnieniem:
ale przy użyciu płynnego przeciwciśnienia jest znacznie mniejsze:
Rxjava2 ma kilka strategii przeciwprężnych, których możesz użyć w zależności od zastosowania. przez strategię rozumiem, że Rxjava2 zapewnia sposób obsługi obiektów, których nie można przetworzyć z powodu przepełnienia (przeciwciśnienia).
oto strategie. Nie omówię ich wszystkich, ale na przykład, jeśli nie chcesz się martwić o przepełnione przedmioty, możesz użyć takiej strategii upuszczania:
observable.toFlowable (BackpressureStrategy.DROP)
O ile wiem, kolejka powinna mieć limit 128 pozycji, po czym może nastąpić przepełnienie (przeciwciśnienie). Nawet jeśli nie jest to 128, jest blisko tej liczby. Mam nadzieję, że to komuś pomoże.
jeśli chcesz zmienić rozmiar bufora z 128, wygląda na to, że można to zrobić w ten sposób (ale uważaj na ograniczenia pamięci:
w rozwoju oprogramowania zwykle strategia przeciwciśnienia oznacza, że mówisz emitentowi, aby nieco zwolnił, ponieważ konsument nie może poradzić sobie z prędkością, z jaką emitujesz zdarzenia.
źródło
Fakt, że
Flowable
rozbił się po wyemitowaniu 128 wartości bez obsługi przeciwciśnienia, nie oznacza, że zawsze będzie się zawieszał po dokładnie 128 wartościach: czasami zawiesza się po 10, a czasami w ogóle się nie zawiesza. Wydaje mi się, że właśnie tak się stało, gdy wypróbowałeś ten przykładObservable
- zdarzyło się, że nie było przeciwciśnienia, więc twój kod działał normalnie, następnym razem może nie. Różnica w RxJava 2 polega na tym, że nie ma już koncepcji przeciwciśnienia wObservable
s i nie ma sposobu, aby sobie z nim poradzić. Jeśli projektujesz sekwencję reaktywną, która prawdopodobnie będzie wymagać wyraźnej obsługi przeciwciśnienia - wtedyFlowable
jest to najlepszy wybór.źródło
interval
bez,backpressure
czy spodziewałbym się dziwnego zachowania lub problemów?