ExecutorService, która przerywa zadania po przekroczeniu limitu czasu

95

Szukam implementacji ExecutorService, która może być dostarczona z limitem czasu. Zadania przesłane do ExecutorService są przerywane, jeśli ich uruchomienie trwa dłużej niż limit czasu. Wdrożenie takiej bestii nie jest trudnym zadaniem, ale zastanawiam się, czy ktoś wie o istniejącej implementacji.

Oto, co wymyśliłem na podstawie części dyskusji poniżej. Jakieś uwagi?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Edward Dale
źródło
Czy to „czas rozpoczęcia” upływu czasu do przesłania? A może czas rozpoczęcia wykonywania zadania?
Tim Bender
Dobre pytanie. Kiedy zaczyna się wykonywać. Prawdopodobnie za pomocą protected void beforeExecute(Thread t, Runnable r)haka.
Edward Dale
@ scompt.com, czy nadal używasz tego rozwiązania, czy zostało ono zastąpione
Paul Taylor
@PaulTaylor Praca, w której wdrażałem to rozwiązanie, została zastąpiona. :-)
Edward Dale
Potrzebuję tego dokładnie, z wyjątkiem: a) Potrzebuję, aby moja główna usługa harmonogramu była pulą wątków z pojedynczym wątkiem usługi, ponieważ moje zadania muszą być wykonywane ściśle współbieżnie oraz b) Muszę mieć możliwość określenia czasu trwania limitu czasu dla każdego zadania w czas przesłania zadania. Próbowałem użyć tego jako punktu wyjścia, ale rozszerzając ScheduledThreadPoolExecutor, ale nie widzę sposobu, aby uzyskać określony czas oczekiwania, który ma być określony w czasie przesyłania zadania, do metody beforeExecute. Wszelkie sugestie mile widziane!
Michael Ellis

Odpowiedzi:

91

W tym celu możesz użyć usługi ScheduledExecutorService . Najpierw przesłałbyś go tylko raz, aby rozpocząć natychmiast i zachować stworzoną przyszłość. Następnie możesz przesłać nowe zadanie, które po pewnym czasie anulowałoby zachowaną przyszłość.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Spowoduje to wykonanie programu obsługi (główna funkcja, która ma zostać przerwana) przez 10 sekund, a następnie anuluje (tj. Przerwie) to konkretne zadanie.

John Vint
źródło
13
Ciekawy pomysł, ale co, jeśli zadanie zakończy się przed upływem limitu czasu (co normalnie będzie)? Wolałbym, aby mnóstwo zadań porządkujących nie czekało na uruchomienie tylko po to, aby dowiedzieć się, że przydzielone zadanie zostało już ukończone. Musiałby istnieć kolejny wątek monitorujący Futures po zakończeniu, aby usunąć zadania czyszczenia.
Edward Dale
4
Wykonawca zaplanuje to anulowanie tylko raz. Jeśli zadanie zostanie zakończone, anulowanie oznacza brak operacji i praca jest kontynuowana bez zmian. Potrzebny jest tylko jeden dodatkowy schemat wątku, aby anulować zadania i jeden wątek, aby je uruchomić. Możesz mieć dwóch wykonawców, jednego do przesyłania głównych zadań, a drugiego do ich anulowania.
John Vint
3
To prawda, ale co, jeśli limit czasu wynosi 5 godzin iw tym czasie jest wykonywanych 10 tys. Zadań. Chciałbym uniknąć sytuacji, w których te wszystkie no-opy zajmują pamięć i powodują zmianę kontekstu.
Edward Dale
1
@Scompt Niekoniecznie. Byłoby 10k wywołań future.cancel (), jednak jeśli przyszłość zostanie zakończona, anulowanie spowoduje szybką ścieżkę i nie spowoduje żadnych zbędnych prac. Jeśli nie chcesz 10 000 dodatkowych wywołań anulowania, może to nie zadziałać, ale ilość pracy wykonanej po zakończeniu zadania jest bardzo mała.
John Vint
6
@John W .: Właśnie zdałem sobie sprawę z kolejnego problemu z Twoją implementacją. Potrzebuję limitu czasu, aby rozpocząć się, gdy zadanie rozpocznie wykonywanie, jak skomentowałem wcześniej. Myślę, że jedynym sposobem na to jest użycie beforeExecutehaka.
Edward Dale
6

Niestety rozwiązanie jest wadliwe. Występuje pewien błąd z ScheduledThreadPoolExecutor, również zgłaszany w tym pytaniu : anulowanie przesłanego zadania nie zwalnia w pełni zasobów pamięci związanych z zadaniem; zasoby są zwalniane dopiero po wygaśnięciu zadania.

Jeśli zatem utworzysz plik TimeoutThreadPoolExecutoro dość długim czasie wygaśnięcia (typowe użycie) i prześlesz zadania wystarczająco szybko, w końcu zapełniasz pamięć - nawet jeśli zadania zostały faktycznie zakończone pomyślnie.

Możesz zobaczyć problem z następującym (bardzo prymitywnym) programem testowym:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Program wyczerpuje dostępną pamięć, chociaż czeka na zakończenie utworzonych Runnables.

Myślałem o tym przez chwilę, ale niestety nie mogłem wymyślić dobrego rozwiązania.

EDYCJA: Odkryłem, że ten problem został zgłoszony jako błąd JDK 6602600 i wydaje się, że został naprawiony bardzo niedawno.

Flavio
źródło
4

Zawiń zadanie w FutureTask i możesz określić limit czasu dla FutureTask. Spójrz na przykład w mojej odpowiedzi na to pytanie,

java natywny limit czasu procesu

ZZ Coder
źródło
1
Zdaję sobie sprawę, że za pomocą java.util.concurrentklas można to zrobić na kilka sposobów , ale szukam ExecutorServiceimplementacji.
Edward Dale
1
Jeśli mówisz, że chcesz, aby usługa ExecutorService ukrywała fakt, że limity czasu są dodawane z kodu klienta, możesz zaimplementować własną usługę ExecutorService, która otacza każdy przekazany jej element wykonawczy FutureTask przed ich wykonaniem.
erikprice
2

Po mnóstwie czasu na badanie,
wreszcie używam invokeAllmetody ExecutorServicedo rozwiązania tego problemu.
Spowoduje to ścisłe przerwanie zadania podczas wykonywania zadania.
Oto przykład

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

Zaletą jest to, że możesz również przesłać ListenableFuturew tym samym ExecutorService.
Wystarczy nieznacznie zmienić pierwszą linię kodu.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceto funkcja słuchania projektu ExecutorServicew google guava ( com.google.guava ))

Jasio
źródło
2
Dzięki za wskazanie invokeAll. To działa bardzo dobrze. Tylko słowo ostrzeżenia dla każdego, kto myśli o użyciu tego: chociaż invokeAllzwraca listę Futureobiektów, w rzeczywistości wydaje się, że jest to operacja blokująca.
mxro
1

Wygląda na to, że problem nie tkwi w błędzie JDK 6602600 (został rozwiązany w 2010-05-22), ale w nieprawidłowym wywołaniu snu (10) w kółku. Należy dodać, że główny wątek musi dawać bezpośrednio SZANSĘ innym wątkom na realizację ich zadań poprzez wywołanie SLEEP (0) w KAŻDEJ gałęzi zewnętrznego koła. Myślę, że lepiej jest użyć Thread.yield () zamiast Thread.sleep (0)

Wynik poprawionej części poprzedniego kodu problemu jest taki:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Działa poprawnie przy ilości licznika zewnętrznego do 150 000 000 badanych okręgów.

Siergiej
źródło
1

Korzystając z odpowiedzi Johna W. stworzyłem implementację, która poprawnie rozpoczyna przekroczenie limitu czasu, gdy zadanie rozpoczyna swoje wykonanie. Piszę nawet dla niego test jednostkowy :)

Jednak nie odpowiada to moim potrzebom, ponieważ niektóre operacje IO nie przerywają, gdy Future.cancel()jest wywoływana (tj. Gdy Thread.interrupt()jest wywoływana). Niektóre przykłady operacji IO, które mogą nie zostać przerwane podczas Thread.interrupt()wywołania, to Socket.connecti Socket.read(i podejrzewam, że większość operacji IO zaimplementowano w java.io). Wszystkie operacje we / wy java.niopowinny być przerywane w momencie Thread.interrupt()wywołania. Na przykład tak jest w przypadku SocketChannel.openi SocketChannel.read.

W każdym razie, jeśli ktoś jest zainteresowany, stworzyłem sedno wykonawcy puli wątków, który pozwala zadaniom na przekroczenie limitu czasu (jeśli używają operacji przerywalnych ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

amanteaux
źródło
Ciekawy kod, wciągnąłem go do swojego systemu i jestem ciekawy, czy masz jakieś przykłady tego, jakiego rodzaju operacje we / wy nie będą przerywać, więc mogę sprawdzić, czy wpłynie to na mój system. Dzięki!
Duncan Krebs
@DuncanKrebs Wyszczególniłem moją odpowiedź na przykładzie nieprzerywalnego IO: Socket.connectiSocket.read
amanteaux
myThread.interrupted()nie jest poprawną metodą przerywania, ponieważ CZYŚCI flagę przerwania. Użyj myThread.interrupt()zamiast tego, a to powinno z gniazdami
DanielCuadra
@DanielCuadra: Dziękuję, wygląda na to, że popełniłem błąd literowy, ponieważ Thread.interrupted()nie pozwala na przerwanie wątku. Jednak Thread.interrupt()nie przerywa java.iooperacji, działa tylko na java.niooperacjach.
amanteaux
Używam interrupt()od wielu lat i zawsze przerywało operacje java.io (a także inne metody blokowania, takie jak usypianie wątków, połączenia jdbc, blokowanie kolejki, itp.). Może znalazłeś klasę buggy lub jakąś
maszynę
0

A co z tym alternatywnym pomysłem:

  • dwa mają dwóch wykonawców:
    • jeden dla :
      • przesłanie zadania bez dbania o przekroczenie limitu czasu zadania
      • Doprowadziło to do dodania Przyszłości i czasu, w którym powinna się zakończyć do wewnętrznej struktury
    • jeden do wykonywania zadania wewnętrznego, które polega na sprawdzeniu struktury wewnętrznej, czy niektóre zadania mają przekroczony limit czasu i czy muszą zostać anulowane.

Mała próbka jest tutaj:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Ionut Mesaros
źródło
0

sprawdź, czy to działa dla Ciebie,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

możesz ograniczyć liczbę używanych wątków z harmonogramu, a także ustawić limit czasu zadania.

Nitish Kumar
źródło