Różnica między CompletableFuture, Future i RxJava's Observable

193

Chciałbym poznać różnicę między CompletableFuture, Futurea Observable RxJava.

Wiem, że wszystko jest asynchroniczne, ale

Future.get() blokuje wątek

CompletableFuture podaje metody wywołania zwrotnego

RxJava Observable--- podobnie jak w CompletableFutureprzypadku innych korzyści (nie jestem pewien)

Na przykład: jeśli klient musi wykonać wiele wywołań usług, a kiedy korzystamy Futures(Java) Future.get()będzie wykonywany sekwencyjnie ... chciałby wiedzieć, jak lepiej działa w RxJava ..

Dokumentacja http://reactivex.io/intro.html mówi

Futures jest trudny do optymalnego komponowania warunkowych asynchronicznych przepływów wykonawczych (lub niemożliwy, ponieważ opóźnienia w każdym żądaniu zmieniają się w czasie wykonywania). Można to oczywiście zrobić, ale szybko się komplikuje (a tym samym jest podatny na błędy) lub przedwcześnie blokuje Future.get (), co eliminuje korzyści z wykonywania asynchronicznego.

Naprawdę zainteresowany wiedzą, jak RxJavarozwiązać ten problem. Z dokumentacji trudno mi było to zrozumieć.

shiv455
źródło
Czy przeczytałeś dokumentację dla każdego z nich? RxJava zupełnie mnie nie zna, ale dokumentacja na pierwszy rzut oka wydaje się bardzo dokładna. Nie wydaje się to szczególnie porównywalne z tymi dwoma kontraktami terminowymi.
FThompson
Przeszedłem, ale nie jestem w stanie dostrzec, jak różni się od futures na Javę ... popraw mnie, jeśli się mylę
shiv455
W jaki sposób obserwowalne są podobne do kontraktów futures?
FThompson,
2
chciałbyś wiedzieć, gdzie jest inaczej, czy różni się w zarządzaniu wątkami? EX: Future.get () blokuje wątek .... jak będzie on obsługiwany w Observable ???
shiv455
2
przynajmniej dla mnie to trochę mylące ... różnica poziomów byłaby naprawdę pomocna !!
shiv455

Odpowiedzi:

279

Futures

Futures zostały wprowadzone w Javie 5 (2004). Są to po prostu symbole zastępcze dla wyniku operacji, która jeszcze się nie zakończyła. Po zakończeniu operacji Futurebędzie zawierać ten wynik. Na przykład operacją może być instancja Runnable lub Callable, która jest przesyłana do usługi ExecutorService . Podmiot wysyłający operację może użyć Futureobiektu do sprawdzenia, czy operacja toDone () , lub poczekać, aż zakończy się przy użyciu blokującej metody get () .

Przykład:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures zostały wprowadzone w Javie 8 (2014). W rzeczywistości są one ewolucją zwykłych kontraktów futures, zainspirowanych Google Playable Futures , częścią biblioteki Guava . Są to kontrakty futures, które pozwalają również łączyć zadania w łańcuch. Możesz ich użyć, aby powiedzieć niektórym wątkom roboczym, że „idź zrób jakieś zadanie X, a kiedy skończysz, zrób to samo, używając wyniku X”. Korzystając z CompletableFutures, możesz zrobić coś z wynikiem operacji bez faktycznego blokowania wątku w oczekiwaniu na wynik. Oto prosty przykład:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava to cała biblioteka do programowania reaktywnego stworzona w Netflix. Na pierwszy rzut oka będzie wyglądać podobnie do strumieni Java 8 . Jest, z wyjątkiem tego, że jest znacznie potężniejszy.

Podobnie jak Futures, RxJava może być używany do łączenia ze sobą szeregu akcji synchronicznych lub asynchronicznych w celu utworzenia potoku przetwarzania. W przeciwieństwie do kontraktów futures, które są jednorazowego użytku, RxJava działa na strumieniach zerowych lub więcej przedmiotów. W tym niekończące się strumienie z nieskończoną liczbą przedmiotów. Jest także znacznie bardziej elastyczny i wydajny dzięki niewiarygodnie bogatemu zestawowi operatorów .

W przeciwieństwie do strumieni Java 8, RxJava ma również mechanizm ciśnienia wstecznego , który pozwala mu obsługiwać przypadki, w których różne części twojego potoku przetwarzania działają w różnych wątkach, z różnymi prędkościami .

Minusem RxJava jest to, że pomimo solidnej dokumentacji jest to trudna biblioteka do nauczenia się z powodu zmiany paradygmatu. Kod Rx może być również koszmarem do debugowania, szczególnie jeśli w grę wchodzi wiele wątków, a nawet gorzej - jeśli potrzebne jest ciśnienie wsteczne.

Jeśli chcesz się tym zająć, na oficjalnej stronie znajduje się cała strona z różnymi samouczkami, a także oficjalna dokumentacja i Javadoc . Możesz także rzucić okiem na niektóre filmy, takie jak ten, który zawiera krótkie wprowadzenie do Rx, a także mówi o różnicach między Rx a Futures.

Premia: Reaktywne strumienie Java 9

Reaktywne strumienie Java 9, zwane Flow API, to zestaw interfejsów implementowanych przez różne biblioteki strumieni reaktywnych, takie jak RxJava 2 , Akka Streams i Vertx . Pozwalają one na połączenie tych reaktywnych bibliotek, zachowując jednocześnie wszystkie ważne przeciwciśnienia.

Słód
źródło
Byłoby miło podać przykładowy kod, w jaki sposób Rx to robi
Zinan Xing,
Więc używając Reactive Streams, możemy mieszać RxJava, Akka i Vertx w jednej aplikacji?
IgorGanapolsky
1
@IgorGanapolsky Tak.
Słód
W CompletableFutures używamy metod wywołania zwrotnego, te metody wywołania zwrotnego będą również blokowane, jeśli wyjście jednej metody jest wejściem innego wywołania zwrotnego. Jako przyszły blok z wywołaniem Future.get (). Dlaczego mówi się, że Future.get () blokuje wywołanie, podczas gdy CompletableFutures nie blokuje. Proszę wyjaśnić
Deepak,
1
@Federico Sure. Każdy z nich Futurejest symbolem zastępczym dla pojedynczego wyniku, który mógł jeszcze nie zostać wypełniony. Jeśli ponownie wykonasz tę samą operację, otrzymasz nową Futureinstancję. RxJava zajmuje się strumieniami wyników, które mogą pojawić się w dowolnym momencie. Seria operacji może zatem zwrócić jeden obserwowalny wynik RxJava, który wypompuje wiele wyników. To trochę jak różnica między pojedynczą kopertą pocztową a pneumatyczną rurką, która ciągle pompuje pocztę.
Słód
20

Pracuję z Rx Java od wersji 0.9, teraz w wersji 1.3.2 i wkrótce migruję do wersji 2.x Używam tego w prywatnym projekcie, w którym pracuję już od 8 lat.

W ogóle nie programowałbym bez tej biblioteki. Na początku byłem sceptyczny, ale jest to zupełnie inny stan umysłu, który musisz stworzyć. Ciche trudne na początku. Czasami patrzyłem na kulki godzinami .. lol

To tylko kwestia praktyki i naprawdę poznania przepływu (inaczej kontraktu obserwowalnych i obserwatora), kiedy już tam dotrzesz, nie będziesz chciał tego robić inaczej.

Dla mnie biblioteka nie ma tak naprawdę wad.

Przypadek użycia: Mam widok monitora, który zawiera 9 mierników (procesor, pamięć, sieć itp.). Podczas uruchamiania widoku widok subskrybuje się do klasy monitorowania systemu, która zwraca obserwowalny (przedział), który zawiera wszystkie dane z 9 metrów. Będzie przesyłać co sekundę nowy wynik do widoku (więc nie odpytywanie !!!). To, co możliwe do zaobserwowania, wykorzystuje płaską mapę do jednoczesnego (asynchronizacji!) Pobierania danych z 9 różnych źródeł i zamyka wynik w nowym modelu, który Twój widok otrzyma w onNext ().

Jak do diabła to zrobisz z futures, dodatkami itp. ... Powodzenia! :)

Rx Java rozwiązuje dla mnie wiele problemów programistycznych i znacznie ułatwia ...

Zalety:

  • Statelss !!! (ważna rzecz do wspomnienia, może najważniejsze)
  • Zarządzanie wątkami po wyjęciu z pudełka
  • Twórz sekwencje, które mają własny cykl życia
  • Wszystko jest obserwowalne, więc łączenie jest łatwe
  • Mniej kodu do napisania
  • Pojedynczy słoik na ścieżce klas (bardzo lekki)
  • Wysoce współbieżny
  • Nie ma już piekła zwrotnego
  • Oparte na abonencie (ścisła umowa między konsumentem a producentem)
  • Strategie przeciwciśnienia (wyłącznik itp.)
  • Znakomita obsługa błędów i ich odzyskiwanie
  • Bardzo ładna dokumentacja (kulki <3)
  • Kompletna kontrola
  • Wiele więcej ...

Wady: - Trudne do przetestowania

Kristoffer
źródło
13
~ „ W ogóle nie programowałbym bez tej biblioteki. ” Więc RxJava jest wszystkim, co najważniejsze, dla wszystkich projektów oprogramowania?
IgorGanapolsky
Czy jest to przydatne, nawet jeśli nie mam strumienia zdarzeń asynchronicznych?
Mukesh Verma