Observable vs Flowable rxJava2

128

Patrzyłem na nową rx java 2 i nie jestem do końca pewien, czy rozumiem już ideę backpressure...

Zdaję sobie sprawę, że my Observablenie mamy backpressurewsparcia i Flowableto je ma.

Więc na podstawie przykładu powiedzmy, że mam flowablez interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

To się zawiesza po około 128 wartościach i jest to całkiem oczywiste, że konsumuję wolniej niż zdobywanie przedmiotów.

Ale mamy to samo z Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

To wcale się nie zawiesi, nawet jeśli opóźnię spożycie, nadal działa. Aby Flowablepracować, powiedzmy, że wstawię onBackpressureDropoperator, awaria zniknęła, ale nie wszystkie wartości są również emitowane.

Więc podstawowe pytanie, na które nie mogę znaleźć odpowiedzi w mojej głowie, brzmi: dlaczego mam się przejmować, backpressurekiedy mogę używać zwykłego Observablenadal odbierać wszystkie wartości bez zarządzania buffer? A może z drugiej strony, jakie korzyści backpressuredają mi korzyści z zarządzania i obsługi konsumpcji?

user2141889
źródło

Odpowiedzi:

123

To, co objawia się przeciwciśnienie w praktyce, to ograniczone bufory, Flowable.observeOnma bufor złożony ze 128 elementów, który jest opróżniany tak szybko, jak może to przyjąć strumień zstępujący. Możesz zwiększyć ten rozmiar bufora indywidualnie, aby obsłużyć źródło wybuchowe, a wszystkie praktyki zarządzania ciśnieniem wstecznym nadal mają zastosowanie od 1.x. Observable.observeOnma nieograniczony bufor, który nieustannie zbiera elementy i aplikacji może zabraknąć pamięci.

Możesz użyć Observablena przykład:

  • obsługa zdarzeń GUI
  • praca z krótkimi sekwencjami (łącznie mniej niż 1000 elementów)

Możesz użyć Flowablena przykład:

  • źródła zimne i bezczasowe
  • źródła podobne do generatora
  • sieci dostępu i bazy danych
akarnokd
źródło
Od tego ma pochodzić z innego pytanie - czy to prawda, że bardziej ograniczone typy jak Maybe, Singlea Completablemoże zawsze być używane zamiast Flowablegdy są semantycznie właściwe?
david.mihola
1
Tak Maybe, Singlei Completableto zdecydowanie zbyt mała, aby mieć jakąkolwiek potrzebę pojęcia zwrotnym. Nie ma szans, by producent wyemitował przedmioty szybciej, niż mogą zostać zużyte, ponieważ 0–1 przedmiotów zostanie kiedykolwiek wyprodukowanych lub zużytych.
AndrewF
Może nie mam racji, ale dla mnie Przykłady Flowable i Observable powinny zostać zamienione.
Yura Galavay
Myślę, że w pytaniu brakuje mu strategii przeciwciśnienia, którą musimy dostarczyć do Flowable, co wyjaśnia, dlaczego zgłoszony jest brakujący wyjątek przeciwciśnienia, a także wyjaśnia, dlaczego ten wyjątek znika po zastosowaniu .onBackpressureDrop (). A dla Observable, ponieważ nie ma tej strategii i nie można jej zapewnić, po prostu zawiedzie później z powodu OOM
Haomin
111

Przeciwciśnienie występuje wtedy, gdy Twój obserwowalny (wydawca) tworzy więcej zdarzeń, niż może obsłużyć Twój subskrybent. Możesz więc uzyskać brakujące wydarzenia dla subskrybentów lub możesz uzyskać ogromną kolejkę wydarzeń, która ostatecznie prowadzi do braku pamięci. Flowableuwzględnia przeciwciśnienie. Observablenie. Otóż ​​to.

przypomina mi lejek, który, gdy ma zbyt dużo płynu, przelewa się. Flowable może pomóc w uniknięciu tego:

z ogromnym przeciwciśnieniem:

wprowadź opis obrazu tutaj

ale przy użyciu płynnego przeciwciśnienia jest znacznie mniejsze:

wprowadź opis obrazu tutaj

Rxjava2 ma kilka strategii przeciwprężnych, których możesz użyć w zależności od zastosowania. przez strategię rozumiem, że Rxjava2 zapewnia sposób obsługi obiektów, których nie można przetworzyć z powodu przepełnienia (przeciwciśnienia).

oto strategie. Nie omówię ich wszystkich, ale na przykład, jeśli nie chcesz się martwić o przepełnione przedmioty, możesz użyć takiej strategii upuszczania:

observable.toFlowable (BackpressureStrategy.DROP)

O ile wiem, kolejka powinna mieć limit 128 pozycji, po czym może nastąpić przepełnienie (przeciwciśnienie). Nawet jeśli nie jest to 128, jest blisko tej liczby. Mam nadzieję, że to komuś pomoże.

jeśli chcesz zmienić rozmiar bufora z 128, wygląda na to, że można to zrobić w ten sposób (ale uważaj na ograniczenia pamięci:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

w rozwoju oprogramowania zwykle strategia przeciwciśnienia oznacza, że ​​mówisz emitentowi, aby nieco zwolnił, ponieważ konsument nie może poradzić sobie z prędkością, z jaką emitujesz zdarzenia.

j2emanue
źródło
Zawsze myślałem, że backpressure to nazwa rodziny mechanizmów, które pozwolą konsumentowi powiadomić producenta o spowolnieniu ...
kboom
To może być przypadek. Tak
j2emanue
Czy są jakieś wady używania Flowable?
Igor Ganapolsky
Te obrazy mnie okłamują. Porzucenie wydarzeń nie zakończy się „większą ilością pieniędzy” na dole.
EpicPandaForce
1
@ j2emanue, mylisz rozmiar bufora dla operatorów i operatora Flowable.buffer (int). Przeczytaj uważnie javadocs i odpowiednio popraw odpowiedź: reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
tomek
15

Fakt, że Flowablerozbił się po wyemitowaniu 128 wartości bez obsługi przeciwciśnienia, nie oznacza, że ​​zawsze będzie się zawieszał po dokładnie 128 wartościach: czasami zawiesza się po 10, a czasami w ogóle się nie zawiesza. Wydaje mi się, że właśnie tak się stało, gdy wypróbowałeś ten przykład Observable- zdarzyło się, że nie było przeciwciśnienia, więc twój kod działał normalnie, następnym razem może nie. Różnica w RxJava 2 polega na tym, że nie ma już koncepcji przeciwciśnienia w Observables i nie ma sposobu, aby sobie z nim poradzić. Jeśli projektujesz sekwencję reaktywną, która prawdopodobnie będzie wymagać wyraźnej obsługi przeciwciśnienia - wtedy Flowablejest to najlepszy wybór.

Egor
źródło
Tak, zauważyłem, że czasami pękało po mniejszych wartościach, czasami nie. Ale znowu, jeśli na przykład radzę sobie tylko intervalbez, backpressureczy spodziewałbym się dziwnego zachowania lub problemów?
user2141889
Jeśli jesteś pewien, że nie ma możliwości wystąpienia problemów z ciśnieniem wstecznym w określonej sekwencji obserwowalnej - myślę, że można zignorować ciśnienie wsteczne.
Egor