Przekształć przyszłość Javy w kompletną przyszłość

97

Java 8 wprowadza CompletableFuturenową implementację Future, którą można komponować (zawiera kilka metod thenXxx). Chciałbym używać tego wyłącznie, ale wiele bibliotek, których chcę użyć, zwraca tylko Futureinstancje, których nie można komponować .

Czy istnieje sposób na zawarcie zwróconych Futurewystąpień wewnątrz elementu CompleteableFuture, aby móc go skomponować?

Dan Midwood
źródło

Odpowiedzi:

57

Jest sposób, ale ci się to nie spodoba. Poniższa metoda przekształca a Future<T>w a CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Oczywiście problem z tym podejściem polega na tym, że w przypadku każdej Przyszłości wątek zostanie zablokowany, aby czekać na wynik Przyszłości - zaprzeczenie idei przyszłości. W niektórych przypadkach można zrobić lepiej. Jednak generalnie nie ma rozwiązania bez aktywnego oczekiwania na wynik Przyszłości .

nosid
źródło
1
Ha, dokładnie to napisałem, zanim pomyślałem, że musi być lepszy sposób. Ale chyba nie
Dan Midwood
12
Hmmm… czy to rozwiązanie nie zjada jednego z wątków „wspólnej puli” tylko na czekanie? Te wątki ze „wspólnej puli” nigdy nie powinny blokować… hmmmm…
Peti
1
@Peti: Masz rację. Chodzi jednak o to, że jeśli najprawdopodobniej robisz coś źle, niezależnie od tego, czy używasz wspólnej puli, czy nieograniczonej puli wątków .
nosid
4
To może nie być idealne, ale użycie CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())przynajmniej nie zablokowałoby wspólnych wątków puli.
MikeFHay
6
Proszę, po prostu nigdy tego nie rób
Laymain
56

Jeśli biblioteka, której chcesz użyć, oprócz stylu Future oferuje również metodę stylu wywołania zwrotnego, możesz zapewnić jej procedurę obsługi, która uzupełnia CompletableFuture bez dodatkowego blokowania wątków. Tak jak to:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

Bez wywołania zwrotnego jedynym sposobem rozwiązania tego problemu jest użycie pętli sondującej, która umieszcza wszystkie Future.isDone()sprawdzenia w jednym wątku, a następnie wywołuje zakończenie, gdy można uzyskać tabelę Future.

Kafkowski
źródło
1
Używam biblioteki asynchronicznej Apache Http, która akceptuje FutureCallback.
Ułatwiło
13

Jeśli twoja Futurejest wynikiem wywołania ExecutorServicemetody (np. submit()), Najłatwiej byłoby użyć rozszerzeniaCompletableFuture.runAsync(Runnable, Executor) zamiast niej metody.

Z

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

do

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

Następnie CompletableFuturejest tworzony „natywnie”.

EDYCJA: Kontynuując komentarze @SamMefford poprawione przez @MartinAndersson, jeśli chcesz zdać a Callable, musisz zadzwonić supplyAsync(), zamieniając Callable<T>na a Supplier<T>, np. Za pomocą:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value
}, myExecutor);

Ponieważ T Callable.call() throws Exception;zgłasza wyjątek, a T Supplier.get();nie, musisz go złapać, aby prototypy były kompatybilne.

Matthieu
źródło
1
Lub, jeśli używasz Callable <T> zamiast Runnable, zamiast tego spróbuj supplyAsync:CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
Sam Mefford
@SamMefford dzięki, zredagowałem, aby uwzględnić te informacje.
Matthieu
supplyAsyncotrzymuje Supplier. Kod nie zostanie skompilowany, jeśli spróbujesz przekazać plik Callable.
Martin Andersson
@MartinAndersson zgadza się, dzięki. Dokonałem dalszej edycji, aby przekonwertować a Callable<T>na Supplier<T>.
Matthieu
10

Opublikowałem mały projekt przyszłościowy , który stara się poprawić odpowiedź niż prostą drogą .

Głównym pomysłem jest użycie tylko jednego wątku (i oczywiście nie tylko pętli spinowej) do sprawdzenia wszystkich stanów Futures wewnątrz, co pomaga uniknąć blokowania wątku z puli dla każdej transformacji Future -> CompletableFuture.

Przykład użycia:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Dmitry Spikhalskiy
źródło
To wygląda interesująco. Czy używa wątku timera? Dlaczego to nie jest akceptowana odpowiedź?
Kira
@Kira Tak, zasadniczo używa jednego wątku czasowego do czekania na wszystkie przesłane futures.
Dmitry Spikhalskiy
8

Sugestia:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

Ale w zasadzie:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

I CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

Przykład:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Gabriel Francisco
źródło
jest to dość proste i eleganckie i pasuje do większości przypadków użycia. Zrobiłbym CompletablePromiseContext statyczny i wziął parametr dla interwału sprawdzania (który jest tutaj ustawiony na 1 ms), a następnie przeciążał CompletablePromise<V>konstruktora, aby móc zapewnić własny CompletablePromiseContextmożliwie inny (dłuższy) interwał sprawdzania dla długotrwałego działania, w Future<V>którym nie nie musi absolutnie być w stanie uruchomić wywołania zwrotnego (lub komponować) natychmiast po zakończeniu, a także możesz mieć instancję CompletablePromiseContextdo obejrzenia zestawu Future(jeśli masz ich wiele)
Dexter Legaspi
5

Zasugeruję inną (miejmy nadzieję, lepszą) opcję: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /równoległy

Krótko mówiąc, pomysł jest następujący:

  1. Wprowadzenie CompletableTask<V>interfejsu - połączenie znaków CompletionStage<V>+RunnableFuture<V>
  2. Wypaczenie ExecutorServicedo powrotu CompletableTaskz submit(...)metod (zamiast Future<V>)
  3. Gotowe, mamy do uruchomienia i komponowania Futures.

Implementacja wykorzystuje alternatywną implementację CompletionStage (zwróć uwagę, CompletionStage zamiast CompletableFuture):

Stosowanie:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Valery Silaev
źródło
2
Mała aktualizacja: kod jest przenoszony do oddzielnego projektu, github.com/vsilaev/tascalate-concurrent , a teraz można używać out-of-the-way box Executor-s z java.util.concurrent.
Valery Silaev