Jest sposób, ale ci się to nie spodoba. Poniższa metoda przekształca a Future<T>
w a CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Oczywiście problem z tym podejściem polega na tym, że w przypadku każdej Przyszłości wątek zostanie zablokowany, aby czekać na wynik Przyszłości - zaprzeczenie idei przyszłości. W niektórych przypadkach można zrobić lepiej. Jednak generalnie nie ma rozwiązania bez aktywnego oczekiwania na wynik Przyszłości .
CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())
przynajmniej nie zablokowałoby wspólnych wątków puli.Jeśli biblioteka, której chcesz użyć, oprócz stylu Future oferuje również metodę stylu wywołania zwrotnego, możesz zapewnić jej procedurę obsługi, która uzupełnia CompletableFuture bez dodatkowego blokowania wątków. Tak jak to:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>(); open.read(buffer, position, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)
Bez wywołania zwrotnego jedynym sposobem rozwiązania tego problemu jest użycie pętli sondującej, która umieszcza wszystkie
Future.isDone()
sprawdzenia w jednym wątku, a następnie wywołuje zakończenie, gdy można uzyskać tabelę Future.źródło
Jeśli twoja
Future
jest wynikiem wywołaniaExecutorService
metody (np.submit()
), Najłatwiej byłoby użyć rozszerzeniaCompletableFuture.runAsync(Runnable, Executor)
zamiast niej metody.Z
do
Następnie
CompletableFuture
jest tworzony „natywnie”.EDYCJA: Kontynuując komentarze @SamMefford poprawione przez @MartinAndersson, jeśli chcesz zdać a
Callable
, musisz zadzwonićsupplyAsync()
, zamieniającCallable<T>
na aSupplier<T>
, np. Za pomocą:CompletableFuture.supplyAsync(() -> { try { return myCallable.call(); } catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value }, myExecutor);
Ponieważ
T Callable.call() throws Exception;
zgłasza wyjątek, aT Supplier.get();
nie, musisz go złapać, aby prototypy były kompatybilne.źródło
CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
supplyAsync
otrzymujeSupplier
. Kod nie zostanie skompilowany, jeśli spróbujesz przekazać plikCallable
.Callable<T>
naSupplier<T>
.Opublikowałem mały projekt przyszłościowy , który stara się poprawić odpowiedź niż prostą drogą .
Głównym pomysłem jest użycie tylko jednego wątku (i oczywiście nie tylko pętli spinowej) do sprawdzenia wszystkich stanów Futures wewnątrz, co pomaga uniknąć blokowania wątku z puli dla każdej transformacji Future -> CompletableFuture.
Przykład użycia:
źródło
Sugestia:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
Ale w zasadzie:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }
I CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> { private Future<V> future; public CompletablePromise(Future<V> future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }
Przykład:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final Future<String> stringFuture = service.submit(() -> "success"); final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
źródło
CompletablePromiseContext
statyczny i wziął parametr dla interwału sprawdzania (który jest tutaj ustawiony na 1 ms), a następnie przeciążałCompletablePromise<V>
konstruktora, aby móc zapewnić własnyCompletablePromiseContext
możliwie inny (dłuższy) interwał sprawdzania dla długotrwałego działania, wFuture<V>
którym nie nie musi absolutnie być w stanie uruchomić wywołania zwrotnego (lub komponować) natychmiast po zakończeniu, a także możesz mieć instancjęCompletablePromiseContext
do obejrzenia zestawuFuture
(jeśli masz ich wiele)Zasugeruję inną (miejmy nadzieję, lepszą) opcję: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /równoległy
Krótko mówiąc, pomysł jest następujący:
CompletableTask<V>
interfejsu - połączenie znakówCompletionStage<V>
+RunnableFuture<V>
ExecutorService
do powrotuCompletableTask
zsubmit(...)
metod (zamiastFuture<V>
)Implementacja wykorzystuje alternatywną implementację CompletionStage (zwróć uwagę, CompletionStage zamiast CompletableFuture):
Stosowanie:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage<String> = exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
źródło