Czekam na listę Future

145

Mam metodę, która zwraca wartość Listprzyszłości

List<Future<O>> futures = getFutures();

Teraz chcę poczekać, aż wszystkie futures zostaną pomyślnie przetworzone lub którekolwiek z zadań, których dane wyjściowe zostaną zwrócone przez przyszłość, zgłosi wyjątek. Nawet jeśli jedno zadanie rzuca wyjątek, nie ma sensu czekać na inną przyszłość.

Byłoby proste podejście

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Ale problem polega na tym, że jeśli, na przykład, czwarta przyszłość generuje wyjątek, wtedy będę niepotrzebnie czekać, aż pierwsze 3 futures będą dostępne.

Jak to rozwiązać? Czy w jakikolwiek sposób odlicza przyswajanie? Nie mogę użyć Future, isDoneponieważ dokument java mówi

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
user93796
źródło
1
kto generuje te przyszłości? Jakiego są typu? Interfejs java.util.concurrent.Future nie zapewnia żądanej funkcjonalności, jedynym sposobem jest użycie własnych kontraktów futures z wywołaniami zwrotnymi.
Alexei Kaigorodov
Możesz utworzyć instancję ExecutionServicedla każdej „partii” zadań, przesłać je do niej, a następnie natychmiast zamknąć usługę i awaitTermination()jak przypuszczam użyć na niej.
millimoose
Możesz użyć a, CountDownLatchjeśli owinąłeś ciało wszystkich swoich przyszłości w a, try..finallyaby upewnić się, że zatrzask również zostanie zmniejszony.
millimoose
docs.oracle.com/javase/7/docs/api/java/util/concurrent/… robi dokładnie to, czego potrzebujesz.
assylias
@AlexeiKaigorodov TAK, moja przyszłość jest typu java.util.concurrent. Pozwalam przyszłość z możliwością wywołania. Otrzymuję Futture, kiedy wysyłam zadanie do usługi wykonawczej
user93796

Odpowiedzi:

124

Możesz użyć CompletionService, aby otrzymać kontrakty futures, gdy tylko będą gotowe, a jeśli jeden z nich zgłosi wyjątek, anuluj przetwarzanie. Coś takiego:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Myślę, że możesz dalej ulepszyć, aby anulować wszystkie nadal wykonywane zadania, jeśli jedno z nich zgłosi błąd.

dcernahoschi
źródło
1
: Twój kod ma ten sam problem, o którym wspomniałem w moim poście.Jeśli 4th future zgłosi wyjątek, kod nadal będzie czekał na zakończenie przyszłych 1,2,3. lub czy completeSerice.take) zwróci przyszłość, która zakończy się jako pierwsza?
user93796
1
A co z limitami czasu? Czy mogę powiedzieć usłudze realizacji, aby czekała maksymalnie X sekund?
user93796
1
Nie powinno mieć. Nie dokonuje iteracji po futures, ale gdy tylko jest gotowy, jest przetwarzany / weryfikowany, jeśli nie zostanie zgłoszony wyjątek.
dcernahoschi
2
Aby przekroczyć limit czasu oczekiwania na pojawienie się przyszłości w kolejce, istnieje metoda sondowania (sekundy) w pliku CompletionService.
dcernahoschi
Oto działający przykład na github: github.com/princegoyal1987/FutureDemo
user18853
107

Jeśli używasz Java 8, możesz to zrobić łatwiej dzięki CompletableFuture i CompletableFuture.allOf , które stosują wywołanie zwrotne dopiero po wykonaniu wszystkich dostarczonych CompletableFutures.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}
Andrejs
źródło
3
Cześć @Andrejs, czy mógłbyś wyjaśnić, co robi ten fragment kodu. Widzę, że jest to sugerowane w wielu miejscach, ale nie wiem, co się właściwie dzieje. Jak są obsługiwane wyjątki, jeśli jeden z wątków zawiedzie?
VSEWHGHP,
2
@VSEWHGHP Z javadoc: Jeśli którakolwiek z podanych CompletableFutures zakończy się wyjątkowo, zwrócony CompletableFuture również to zrobi, z CompletionException przechowującym ten wyjątek jako przyczynę.
Andrejs
1
Tak, więc kontynuowałem to, czy jest jakiś sposób, aby użyć tego fragmentu, ale uzyskać wartości dla wszystkich innych wątków, które zakończyły się pomyślnie? Czy powinienem po prostu iterować listę CompletableFutures i wywołać get ignorując CompletableFuture <List <T>>, ponieważ funkcja sekwencji dba o to, aby wszystkie wątki były kompletne z wynikiem lub wyjątkiem?
VSEWHGHP,
6
To jest rozwiązanie innego problemu. Jeśli masz Futureinstancje, nie możesz zastosować tej metody. Nie jest łatwo przekonwertować Futurena CompletableFuture.
Jarekczek
nie zadziała, jeśli mamy wyjątek w jakimś zadaniu.
slisnychyi
21

Użyj a CompletableFuturew Javie 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());
sendon1982
źródło
1
To powinna być akceptowana odpowiedź. Jest to również część oficjalnej dokumentacji wiosennej: spring.io/guides/gs/async-method
maaw
Działa zgodnie z oczekiwaniami.
Dimon
15

Możesz użyć ExecutorCompletionService . Dokumentacja zawiera nawet przykład dokładnego przypadku użycia:

Załóżmy zamiast tego, że chcesz użyć pierwszego niezerowego wyniku zestawu zadań, ignorując wszelkie napotkane wyjątki i anulując wszystkie inne zadania, gdy pierwsze będzie gotowe:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

Ważną rzeczą, na którą należy zwrócić uwagę, jest to, że ecs.take () otrzyma pierwsze ukończone zadanie, a nie tylko pierwsze przesłane. Dlatego powinieneś je otrzymywać w kolejności kończenia egzekucji (lub rzucania wyjątku).

jmiserez
źródło
3

Jeśli używasz Java 8 i nie chcesz manipulować CompletableFutures, napisałem narzędzie do pobierania wyników dla List<Future<T>>korzystania z przesyłania strumieniowego. Kluczem jest to, że nie wolno ci map(Future::get)rzucać.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

To wymaga narzędzia, AggregateExceptionktóre działa jak C #

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

Ten składnik działa dokładnie tak samo, jak Task.WaitAll języka C # . Pracuję nad wariantem, który robi to samo co CompletableFuture.allOf(odpowiednik Task.WhenAll)

Powodem, dla którego to zrobiłem, jest to, że używam Springa ListenableFuturei nie chcę portować, CompletableFuturemimo że jest to bardziej standardowy sposób

usr-local-ΕΨΗΕΛΩΝ
źródło
1
Głosuj za widzeniem potrzeby równoważnego wyjątku AggregateException.
granadaCoder
Przykład wykorzystania tego narzędzia byłby miły.
XDS
1

W przypadku, gdy chcesz połączyć listę możliwych do zrealizowania przyszłości, możesz to zrobić:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

Aby uzyskać więcej informacji na temat Future & CompletableFuture, przydatne linki:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https : //www.callicoder.com/java-8-completablefuture-tutorial/

Bohao LI
źródło
0

może to by pomogło (nic nie zastąpiłoby nieprzetworzonego wątku, tak!) Proponuję uruchomić każdego Futuregościa z oddzielnym wątkiem (idą równolegle), a potem, gdy kiedykolwiek pojawi się jeden z otrzymanych błędów, to po prostu zasygnalizuje menedżerowi ( Handlerklasie).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Muszę powiedzieć, że powyższy kod powodowałby błąd (nie sprawdzał), ale mam nadzieję, że mógłbym wyjaśnić rozwiązanie. proszę spróbować.


źródło
0
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };
Mohamed Abdo
źródło
0

CompletionService pobierze Twoje Callables za pomocą metody .submit () i możesz pobrać obliczone futures za pomocą metody .take ().

Jedną rzeczą, o której nie można zapomnieć, jest zakończenie ExecutorService przez wywołanie metody .shutdown (). Możesz również wywołać tę metodę tylko wtedy, gdy zapisałeś odwołanie do usługi executora, więc upewnij się, że ją zachowujesz.

Przykładowy kod - dla stałej liczby elementów pracy, nad którymi należy pracować równolegle:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Przykładowy kod - dla dynamicznej liczby elementów pracy, nad którymi należy pracować równolegle:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}
fl0w
źródło
0

Mam klasę narzędziową, która zawiera:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Gdy już to zrobisz, używając importu statycznego, możesz po prostu poczekać na wszystkie futures, takie jak ta:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

możesz także zebrać wszystkie ich wyniki w ten sposób:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Po prostu wracam do mojego starego postu i zauważam, że miałeś kolejny żal:

Ale problem polega na tym, że jeśli, na przykład, czwarta przyszłość generuje wyjątek, wtedy będę niepotrzebnie czekać, aż pierwsze 3 futures będą dostępne.

W takim przypadku prostym rozwiązaniem jest zrobienie tego równolegle:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

W ten sposób pierwszy wyjątek, chociaż nie zatrzyma przyszłości, złamie instrukcję forEach, jak w przykładzie seryjnym, ale ponieważ wszystkie czekają równolegle, nie będziesz musiał czekać na zakończenie pierwszych 3.

Brixomatic
źródło
0
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Stack2 {   
    public static void waitFor(List<Future<?>> futures) {
        List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
        while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
            Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
            while (futureCopiesIterator.hasNext()) {
                Future<?> future = futureCopiesIterator.next();
                if (future.isDone()) {//already done
                    futureCopiesIterator.remove();
                    try {
                        future.get();// no longer waiting
                    } catch (InterruptedException e) {
                        //ignore
                        //only happen when current Thread interrupted
                    } catch (ExecutionException e) {
                        Throwable throwable = e.getCause();// real cause of exception
                        futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
                        return;
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Runnable runnable1 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnable2 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                }
            }
        };


        Runnable fail = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(1000);
                    throw new RuntimeException("bla bla bla");
                } catch (InterruptedException e) {
                }
            }
        };

        List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
                .map(executorService::submit)
                .collect(Collectors.toList());

        double start = System.nanoTime();
        waitFor(futures);
        double end = (System.nanoTime()-start)/1e9;
        System.out.println(end +" seconds");

    }
}
Farhad Baghirov
źródło