Przypadki użycia dla harmonogramów RxJava

253

W RxJava dostępnych jest 5 różnych harmonogramów :

  1. natychmiastowe () : Tworzy i zwraca program planujący, który natychmiast wykonuje pracę w bieżącym wątku.

  2. trampoline () : Tworzy i zwraca program planujący, który kolejkuje pracę nad bieżącym wątkiem do wykonania po zakończeniu bieżącej pracy.

  3. newThread () : Tworzy i zwraca harmonogram, który tworzy nowy wątek dla każdej jednostki pracy.

  4. computation () : Tworzy i zwraca program planujący przeznaczony do pracy obliczeniowej. Może to być wykorzystane do pętli zdarzeń, przetwarzania wywołań zwrotnych i innych prac obliczeniowych. Nie wykonuj prac związanych z IO na tym harmonogramie. Użyj harmonogramów. zamiast tego io () .

  5. io () : Tworzy i zwraca program planujący przeznaczony do pracy związanej z We / Wy. Implementacja jest wspierana przez pulę wątków Executora, która będzie rosła w miarę potrzeb. Można tego użyć do asynchronicznego wykonywania blokujących operacji we / wy. Nie wykonuj prac obliczeniowych na tym harmonogramie. Użyj harmonogramów. zamiast tego computation () .

Pytania:

Pierwsze 3 harmonogramy są dość zrozumiałe; jestem jednak trochę zagubiony w obliczeniach i io .

  1. Czym dokładnie jest „praca związana z IO”? Czy jest używany do obsługi strumieni ( java.io) i plików ( java.nio.files)? Czy jest używany do zapytań do bazy danych? Czy służy do pobierania plików lub uzyskiwania dostępu do interfejsów API REST?
  2. Czym różni się computation () od newThread () ? Czy to dlatego, że wszystkie wywołania computation () są za każdym razem w jednym wątku (w tle) zamiast nowego (w tle)?
  3. Dlaczego źle jest wywoływać obliczenia () podczas wykonywania operacji we / wy?
  4. Dlaczego źle jest wywoływać io () podczas wykonywania obliczeń?
bcorso
źródło

Odpowiedzi:

332

Świetne pytania, myślę, że dokumentacja mogłaby przydać się bardziej szczegółowo.

  1. io()jest wspierany przez nieograniczoną pulę wątków i jest to rodzaj rzeczy, której można użyć do zadań nie wymagających intensywnych obliczeń, czyli takich, które nie obciążają zbytnio procesora. Tak więc interakcja z systemem plików, interakcja z bazami danych lub usługami na innym hoście to dobre przykłady.
  2. computation()jest wspierany przez ograniczoną pulę wątków o rozmiarze równym liczbie dostępnych procesorów. Jeśli próbujesz zaplanować intensywną pracę procesora równolegle na więcej niż dostępnych procesorach (powiedzmy, że używasz newThread()), jesteś gotowy na narzut tworzenia wątków i narzut przełączania kontekstu, ponieważ wątki rywalizują o procesor i jest to potencjalnie duży spadek wydajności.
  3. Najlepiej jest wyjechać computation()na intensywną pracę procesora tylko w przeciwnym razie nie uzyskasz dobrego wykorzystania procesora.
  4. Nie warto wzywać io()do pracy obliczeniowej, ponieważ powód omówiony w io()punkcie 2. jest nieograniczony, a jeśli zaplanujesz tysiąc zadań obliczeniowych io()równolegle, każde z tych tysięcy zadań będzie miało swój własny wątek i będzie konkurować o procesor powodujący koszty przełączania kontekstu.
Dave Moten
źródło
5
Dzięki znajomości źródła RxJava. Przez długi czas było to dla mnie źródłem zamieszania i myślę, że dokumentacja wymaga pogłębienia w tym zakresie.
Dave Moten
2
@IgorGanapolsky Zgaduję, że jest to coś, co rzadko chciałbyś zrobić. Tworzenie nowego wątku dla każdej jednostki pracy rzadko sprzyja wydajności, ponieważ wątki są drogie w budowie i niszczeniu. Zazwyczaj chcesz ponownie używać wątków, które wykonują computation () i inne programy planujące. Jedynym momentem, w którym newThread () może mieć uzasadnione zastosowanie (przynajmniej mogę o tym myśleć), jest uruchamianie pojedynczych, rzadkich, długotrwałych zadań. Nawet wtedy mógłbym użyć io () dla tego scenariusza.
tmn
4
Czy możesz podać przykład, w którym przydatna byłaby trampolina ()? Rozumiem tę koncepcję, ale nie mogę wymyślić scenariusza, który wykorzystałbym w praktyce. To jedyny program, który wciąż jest dla mnie tajemnicą
2015
32
Do połączeń sieciowych używaj Schedulers.io (), a jeśli chcesz ograniczyć liczbę jednoczesnych połączeń sieciowych, użyj Scheduler.from (Executors.newFixedThreadPool (n)).
Dave Moten,
4
Możesz pomyśleć, że timeoutdomyślnie nałożenie na computation()ciebie zablokowałoby wątek, ale tak nie jest. Pod przykryciami computation()stosuje się ScheduledExecutorServicewięc opóźnione działania nie blokują. Biorąc pod uwagę ten fakt, computation()jest to dobry pomysł, ponieważ gdyby był na innym wątku, wówczas ponieślibyśmy koszty zmiany wątku.
Dave Moten
3

Najważniejsze jest to, że zarówno Schedulers.io, jak i Schedulers.computation są wspierane przez niezwiązane pule wątków, w przeciwieństwie do innych wymienionych w pytaniu. Ta właściwość jest udostępniana tylko przez Schedulers.from (Executor) w przypadku, gdy Executor jest tworzony za pomocą newCachedThreadPool (niezwiązany z pulą wątków automatycznego odzyskiwania).

Jak obficie wyjaśniono w poprzednich odpowiedziach i wielu artykułach w Internecie, Schedulers.io i Schedulers.computation należy stosować ostrożnie, ponieważ są one zoptymalizowane pod kątem rodzaju pracy w ich imieniu. Ale moim zdaniem najważniejszą rolą jest zapewnienie realnej współbieżności strumieni reaktywnych .

Wbrew przekonaniu nowicjuszy strumienie reaktywne nie są z natury zbieżne, ale z natury asynchroniczne i sekwencyjne. Z tego właśnie powodu Schedulers.io powinien być używany tylko wtedy, gdy operacja We / Wy jest blokowana (np .: za pomocą polecenia blokującego, takiego jak Apache IOUtils FileUtils.readFileAsString (...) ), więc zawiesiłby wątek wywołujący, dopóki operacja nie zostanie zakończona Gotowe.

Zastosowanie metody asynchronicznej, takiej jak Java AsynchronousFileChannel (...), nie blokowałoby wątku wywołującego podczas operacji, więc nie ma sensu używać osobnego wątku. W rzeczywistości wątki Schedulers.io nie są naprawdę odpowiednie dla operacji asynchronicznych, ponieważ nie uruchamiają pętli zdarzeń, a wywołanie zwrotne nigdy ... nie zostanie wywołane.

Ta sama logika dotyczy dostępu do bazy danych lub zdalnych wywołań API. Nie używaj Schedulers.io, jeśli możesz użyć asynchronicznego lub reaktywnego API do wykonania połączenia.

Powrót do współbieżności. Możesz nie mieć dostępu do asynchronicznego lub reaktywnego interfejsu API do wykonywania operacji we / wy asynchronicznie lub jednocześnie, więc jedyną alternatywą jest wysyłanie wielu wywołań w osobnym wątku. Niestety, reaktywne strumienie są sekwencyjne na ich końcach , ale dobrą wiadomością jest to, że flatMap () operator może wprowadzić współbieżności w swoim rdzeniu .

Współbieżność musi być wbudowana w konstrukcję strumieniową, zwykle za pomocą operatora flatMap () . Ten potężny operator można skonfigurować tak, aby wewnętrznie zapewniał wielowątkowy kontekst dla wbudowanej funkcji FlatMap () <T, R>. Ten kontekst zapewnia wielowątkowy program planujący, taki jak Scheduler.io lub Scheduler.computation .

Znajdź więcej szczegółów w artykułach na temat RxJava2 Schedulers and Concurrency, gdzie znajdziesz przykładowy kod i szczegółowe wyjaśnienia dotyczące używania Schedulerów sekwencyjnie i jednocześnie.

Mam nadzieję że to pomoże,

Softjake

softjake
źródło
2

Ten post na blogu stanowi doskonałą odpowiedź

Z postu na blogu:

Program Schedulers.io () jest wspierany przez nieograniczoną pulę wątków. Służy do pracy niewymagającej dużej mocy obliczeniowej procesora, w tym do interakcji z systemem plików, wykonywania połączeń sieciowych, interakcji z bazą danych itp. Ta pula wątków jest przeznaczona do asynchronicznego wykonywania blokujących operacji we / wy.

Program Schedulers.computation () jest wspierany przez ograniczoną pulę wątków o rozmiarze do liczby dostępnych procesorów. Służy do prac obliczeniowych lub wymagających dużej mocy obliczeniowej procesora, takich jak zmiana rozmiaru obrazów, przetwarzanie dużych zestawów danych itp. Uwaga: gdy przydzielisz więcej wątków obliczeniowych niż dostępnych rdzeni, wydajność spadnie z powodu przełączania kontekstu i narzutu tworzenia wątków, ponieważ wątki rywalizują o czas procesorów.

Schedulers.newThread () tworzy nowy wątek dla każdej zaplanowanej jednostki pracy. Ten harmonogram jest drogi, ponieważ za każdym razem odradza się nowy wątek i nie następuje ponowne użycie.

Schedulers.from (Executor executor) tworzy i zwraca niestandardowy planista wspierany przez określonego executora. Aby ograniczyć liczbę jednoczesnych wątków w puli wątków, użyj Scheduler.from (Executors.newFixedThreadPool (n)). Gwarantuje to, że jeśli zadanie zostanie zaplanowane, gdy wszystkie wątki są zajęte, zostanie ono umieszczone w kolejce. Wątki w puli będą istnieć, dopóki nie zostanie jawnie zamknięte.

Główny wątek lub AndroidSchedulers.mainThread () jest dostarczany przez bibliotekę rozszerzeń RxAndroid do RxJava. Główny wątek (znany również jako wątek interfejsu użytkownika) jest miejscem interakcji użytkownika. Należy uważać, aby nie przeciążyć tego wątku, aby zapobiec niepotrzebnemu interfejsowi użytkownika lub, co gorsza, dialogowi aplikacji nie odpowiada ”(ANR).

Schedulers.single () jest nowością w RxJava 2. Ten harmonogram jest wspierany przez pojedynczy wątek wykonujący zadania sekwencyjnie w żądanej kolejności.

Schedulers.trampoline () wykonuje zadania w sposób FIFO (pierwsze wejście, pierwsze wyjście) przez jeden z uczestniczących wątków roboczych. Jest często używany podczas implementowania rekurencji, aby uniknąć zwiększenia stosu wywołań.

Joe
źródło