TL; DR
Jak przekonwertować Task.whenAll(List<Task>)
na RxJava
?
Mój istniejący kod używa Bolts do tworzenia listy zadań asynchronicznych i czeka, aż wszystkie te zadania zostaną zakończone, zanim wykonają inne kroki. Zasadniczo tworzy List<Task>
i zwraca pojedynczy znak, Task
który jest oznaczony jako zakończony, gdy wszystkie zadania na liście zostaną ukończone, jak na przykładzie w witrynie Bolts .
Szukam zastąpić Bolts
z RxJava
a ja zakładając tę metodę budowania listę zadań asynchronicznych (rozmiar nie wiadomo z góry) i owijając je wszystkie w jeden Observable
jest możliwe, ale nie wiem jak.
Próbowałem patrząc na merge
, zip
, concat
etc ... ale nie może dostać się do pracy, na List<Observable>
które będę budowania jak wszystkie one wydają dostosowane do pracy na dwóch Observables
naraz, jeśli dobrze rozumiem docs.
Próbuję się uczyć RxJava
i nadal jestem bardzo nowy w tym, więc wybacz mi, jeśli to jest oczywiste pytanie lub wyjaśnione gdzieś w dokumentach; Próbowałem szukać. Każda pomoc byłaby mile widziana.
źródło
onErrorResumeNext
, przykład:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
Możesz użyć
flatMap
w przypadku, gdy masz dynamiczną kompozycję zadań. Coś takiego:public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
Kolejny dobry przykład wykonywania równoległego
Uwaga: tak naprawdę nie znam Twoich wymagań dotyczących obsługi błędów. Na przykład co zrobić, jeśli tylko jedno zadanie się nie powiedzie. Myślę, że powinieneś zweryfikować ten scenariusz.
źródło
zip
powiadamia o zakończeniu, gdy tylko jedno z zadań zostało wykonane i tym samym nie ma zastosowania.new Func1<Observable<Boolean>, Observable<Boolean>>()...
new Func1<List<Boolean>, Boolean>()
Spośród proponowanych sugestii zip () w rzeczywistości łączy ze sobą obserwowalne wyniki, które mogą być pożądane lub nie, ale nie zostały zadane w pytaniu. W pytaniu chodziło tylko o wykonanie każdej z operacji, pojedynczo lub równolegle (co nie zostało określone, ale połączony przykład Boltsa dotyczył wykonywania równoległego). Ponadto, zip () zakończy się natychmiast po ukończeniu któregokolwiek z obserwacji, więc narusza wymagania.
W przypadku równoległego wykonywania Observables, flatMap () przedstawiona w drugiej odpowiedzi jest w porządku, ale metoda merge () byłaby prostsza . Zauważ, że scalanie zakończy się po błędzie któregokolwiek z Observables, jeśli wolisz odłożyć zakończenie do zakończenia wszystkich obserwabli, powinieneś spojrzeć na mergeDelayError () .
Myślę, że w przypadku jednego po drugim należy użyć metody statycznej Observable.concat () . Jego javadoc stwierdza w ten sposób:
co brzmi jak to, czego szukasz, jeśli nie chcesz równoległego wykonywania.
Ponadto, jeśli interesuje Cię tylko wykonanie zadania, a nie zwracanie wartości, prawdopodobnie powinieneś zajrzeć do Completable zamiast Observable .
TLDR: myślę, że do wykonywania zadań jeden po drugim i zdarzenia oncompletion po ich zakończeniu, najlepiej nadaje się Completable.concat (). W przypadku wykonywania równoległego Completable.merge () lub Completable.mergeDelayError () brzmi jak rozwiązanie. Pierwsza z nich zatrzyma się natychmiast po wystąpieniu błędu na jakimkolwiek możliwym do ukończenia, druga wykona je wszystkie, nawet jeśli jeden z nich ma błąd, i dopiero wtedy zgłosi błąd.
źródło
Prawdopodobnie spojrzałeś na
zip
operator, który działa z 2 Observables.Istnieje również metoda statyczna
Observable.zip
. Ma jedną formę, która powinna Ci się przydać:Możesz sprawdzić javadoc, aby uzyskać więcej.
źródło
Z Kotlinem
Ważne jest, aby ustawić typ argumentów funkcji, w przeciwnym razie wystąpią błędy kompilacji
Ostatni typ argumentu zmienia się wraz z liczbą argumentów: BiFunction dla 2 Funkcja3 dla 3 Funkcja4 dla 4 ...
źródło
Piszę trochę kodu obliczeniowego w Kotlin z JavaRx Observables i RxKotlin. Chcę obserwować listę obserwowalnych do uzupełnienia, a w międzyczasie informować mnie o postępach i najnowszych wynikach. Na koniec zwraca najlepszy wynik obliczeń. Dodatkowym wymaganiem było równoległe uruchomienie Observables w celu wykorzystania wszystkich moich rdzeni procesora. Skończyło się na tym rozwiązaniu:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }
źródło
@Volatile
, ale jak by to działało, gdyby było wywoływane przez kilka wątków w tym samym czasie? Co by się stało z wynikami?Miałem podobny problem, musiałem pobrać elementy wyszukiwania z rozmowy resztowej, a także zintegrować zapisane sugestie z RecentSearchProvider.AUTHORITY i połączyć je razem w jedną ujednoliconą listę. Próbowałem użyć rozwiązania @MyDogTom, niestety nie ma Observable.from w RxJava. Po kilku badaniach uzyskałem rozwiązanie, które działało dla mnie.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>> { val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0) fetchedItems.add(fetchSearchSuggestions(context,query).toObservable()) fetchedItems.add(getSearchResults(query).toObservable()) return Observable.fromArray(fetchedItems) .flatMapIterable { data->data } .flatMap {task -> task.observeOn(Schedulers.io())} .toList() .map { ArrayList(it) } }
Stworzyłem obserwowalny z tablicy obserwabli, która zawiera listy sugestii i wyników z Internetu w zależności od zapytania. Następnie wystarczy przejrzeć te zadania za pomocą flatMapIterable i uruchomić je za pomocą flatmap, a wyniki umieścić w tablicy, którą można później pobrać do widoku recyklingu.
źródło
Jeśli używasz programu Project Reactor, możesz użyć
Mono.when
.Mono.when(publisher1, publisher2) .map(i-> { System.out.println("everything is done!"); return i; }).block()
źródło