Czy strumienie Java 8 są podobne do obserwacji RxJava?
Definicja strumienia Java 8:
Klasy w nowym
java.util.stream
pakiecie udostępniają interfejs API Stream do obsługi operacji w stylu funkcjonalnym na strumieniach elementów.
java-8
java-stream
rx-java
observable
rahulrv
źródło
źródło
Odpowiedzi:
TL; DR : Wszystkie biblioteki do przetwarzania sekwencji / strumieni oferują bardzo podobne API do budowania potoków. Różnice dotyczą interfejsu API do obsługi wielowątkowości i kompozycji potoków.
RxJava różni się znacznie od Stream. Ze wszystkich rzeczy JDK, najbliższy rx.Observable jest być może kombinacja
java.util.stream.CollectorStream + CompletableFuture (co wiąże się z kosztem radzenia sobie z dodatkową warstwą monady, tj. Koniecznością obsługi konwersji międzyStream<CompletableFuture<T>>
iCompletableFuture<Stream<T>>
).Istnieją znaczące różnice między Observable i Stream:
Stream#parallel()
dzieli sekwencję na partycjeObservable#subscribeOn()
iObservable#observeOn()
nie; Trudno jest emulowaćStream#parallel()
zachowanie z Observable, kiedyś miał.parallel()
metodę, ale ta metoda spowodowała tyle zamieszania, że.parallel()
wsparcie zostało przeniesione do oddzielnego repozytorium na github, RxJavaParallel. Więcej szczegółów w innej odpowiedzi .Stream#parallel()
nie pozwala na określenie puli wątków do użycia, w przeciwieństwie do większości metod RxJava akceptujących opcjonalny harmonogram. Ponieważ wszystkie instancje strumienia w JVM używają tej samej puli łączeń rozwidlonych, dodanie.parallel()
może przypadkowo wpłynąć na zachowanie w innym module programuObservable#interval()
,Observable#window()
i wiele innych; Dzieje się tak głównie dlatego, że strumienie są oparte na ściąganiu i nie ma kontroli nad tym, kiedy emitować następny element w dółtakeWhile()
,takeUntil()
); obejście użycieStream#anyMatch()
jest ograniczone: jest to operacja terminalowa, więc nie możesz jej użyć więcej niż raz na strumieńStrumienie są trudne do skonstruowania samodzielnie, Observable można skonstruować na wiele sposobówEDYCJA: Jak zauważono w komentarzach, istnieją sposoby na skonstruowanie strumienia. Jednakże, ponieważ nie ma zwarcia na terminalu, nie możesz np.łatwo wygenerować Strumień linii w pliku (JDK zapewnia linie Files # i BufferedReader # linie po wyjęciu z pudełka, a innymi podobnymi scenariuszami można zarządzać, konstruując Stream z Iteratora).Observable#using()
); możesz owinąć nim strumień IO lub mutex i mieć pewność, że użytkownik nie zapomni zwolnić zasobu - zostanie on automatycznie usunięty po zakończeniu subskrypcji; Stream maonClose(Runnable)
metodę, ale musisz wywołać ją ręcznie lub za pomocą try-with-resources. Np. musisz pamiętać, że Files # lines () muszą być zawarte w bloku try-with-resources.Podsumowanie: RxJava znacznie różni się od strumieni. Prawdziwe alternatywy RxJava to inne implementacje ReactiveStreams , np. Odpowiednia część Akka.
Aktualizuj . Istnieje sztuczka polegająca na użyciu innej niż domyślna puli złącz rozwidlonych
Stream#parallel
, zobacz niestandardową pulę wątków w strumieniu równoległym Java 8Aktualizuj . Wszystkie powyższe oparte są na doświadczeniach z RxJava 1.x. Teraz, gdy RxJava 2.x jest tutaj , ta odpowiedź może być nieaktualna.
źródło
Stream.generate()
i przekazać własnąSupplier<U>
implementację, po prostu jedną prostą metodę, z której udostępniasz następny element w strumieniu. Jest mnóstwo innych metod. Aby łatwo skonstruować sekwencję,Stream
która zależy od poprzednich wartości, możesz użyćinterate()
metody, każdaCollection
mastream()
metodę iStream.of()
konstruuje aStream
z varargs lub tablicy. WreszcieStreamSupport
obsługuje bardziej zaawansowane tworzenie strumieni za pomocą rozdzielaczy lub strumieni typów pierwotnych.takeWhile()
,takeUntil()
);" - JDK9 ma to, jak sądzę, w takeWhile () i dropWhile ()Java 8 Stream i RxJava wyglądają całkiem podobnie. Mają podobne operatory (filter, map, flatMap ...), ale nie są zbudowane do tego samego zastosowania.
Możesz wykonywać zadania asynchroniczne za pomocą RxJava.
Dzięki strumieniowi Java 8 będziesz przechodzić przez elementy swojej kolekcji.
Możesz zrobić prawie to samo w RxJava (przechodzenie elementów kolekcji), ale ponieważ RxJava koncentruje się na zadaniu równoległym, ... używa synchronizacji, zatrzasku, ... Więc to samo zadanie z użyciem RxJava może być wolniejsze niż ze strumieniem Java 8.
RxJava można porównać do
CompletableFuture
, ale to może być w stanie obliczyć więcej niż jedną wartość.źródło
parallelStream
obsługuje podobną synchronizację prostych przejść / map / filtrowania itp.Istnieje kilka różnic technicznych i koncepcyjnych, na przykład strumienie Java 8 są synchronicznymi sekwencjami wartości jednorazowego użytku, opartymi na ściąganiu, natomiast obserwowalne RxJava to możliwe do ponownej obserwacji, oparte na adaptacyjnie push-pull, potencjalnie asynchroniczne sekwencje wartości. RxJava jest przeznaczona dla języka Java 6+ i działa również na Androida.
źródło
Strumienie Java 8 są oparte na ściąganiu. Wykonujesz iterację po strumieniu Java 8 zużywającym każdy element. I mógłby to być niekończący się strumień.
RXJava
Observable
jest domyślnie oparta na wypychaniu. Subskrybujesz Observable, a otrzymasz powiadomienie, gdy nadejdzie następny element (onNext
) lub kiedy strumień zostanie zakończony (onCompleted
) lub gdy wystąpi błąd (onError
). Ponieważ zObservable
pojawićonNext
,onCompleted
,onError
imprezy, można zrobić kilka potężnych funkcji, takich jak łączenie różnychObservable
s na nowy (zip
,merge
,concat
). Inne rzeczy, które możesz zrobić, to buforowanie, dławienie, ... I używa mniej więcej tego samego API w różnych językach (RxJava, RX w C #, RxJS, ...)Domyślnie RxJava jest jednowątkowa. O ile nie zaczniesz używać harmonogramów, wszystko będzie się odbywać w tym samym wątku.
źródło
Istniejące odpowiedzi są wyczerpujące i poprawne, ale brakuje jasnego przykładu dla początkujących. Pozwólcie, że przytoczę konkretne terminy, takie jak „oparte na pchaniu / ciągnięciu” i „ponowna obserwacja”. Uwaga : nienawidzę tego terminu
Observable
(to strumień na litość boską), więc będę po prostu odnosił się do strumieni J8 vs RX.Rozważ listę liczb całkowitych,
Strumień J8 to narzędzie do modyfikowania kolekcji. Na przykład nawet cyfry można wyodrębnić jako,
To jest w zasadzie mapa Pythona , filtrowanie, zmniejszanie , bardzo ładny (i dawno spóźniony) dodatek do Javy. Ale co by było, gdyby cyfry nie były zbierane z wyprzedzeniem - co by było, gdyby cyfry były przesyłane strumieniowo podczas działania aplikacji - czy moglibyśmy filtrować parzyste w czasie rzeczywistym.
Wyobraź sobie, że oddzielny proces wątku wyprowadza liczby całkowite w losowych momentach, gdy aplikacja jest uruchomiona (
---
oznacza czas)W RX
even
może reagować na każdą nową cyfrę i stosować filtr w czasie rzeczywistymNie ma potrzeby przechowywania list wejściowych i wyjściowych. Jeśli chcesz mieć listę wyników, nie ma problemu, że można ją również przesyłać strumieniowo. W rzeczywistości wszystko jest strumieniem.
Dlatego terminy takie jak „bezstanowy” i „funkcjonalny” są bardziej kojarzone z RX
źródło
RxJava jest również ściśle związana z inicjatywą strumieni reaktywnych i traktuje ją jako prostą implementację interfejsu API strumieni reaktywnych (np. W porównaniu z implementacją strumieni Akka ). Główna różnica polega na tym, że strumienie reaktywne są zaprojektowane tak, aby były w stanie poradzić sobie z ciśnieniem wstecznym, ale jeśli spojrzysz na stronę strumieni reaktywnych, zobaczysz pomysł. Dość dobrze opisują swoje cele, a strumienie są również ściśle związane z reaktywnym manifestem .
Strumienie Java 8 są w zasadzie implementacją nieograniczonej kolekcji, podobną do Scala Stream lub leniwej sekwencji Clojure .
źródło
Strumienie Java 8 umożliwiają wydajne przetwarzanie naprawdę dużych kolekcji przy jednoczesnym wykorzystaniu architektur wielordzeniowych. Natomiast RxJava jest domyślnie jednowątkowa (bez harmonogramów). Więc RxJava nie wykorzysta maszyn wielordzeniowych, chyba że sam zaprogramujesz tę logikę.
źródło