Szukam implementacji ExecutorService, która może być dostarczona z limitem czasu. Zadania przesłane do ExecutorService są przerywane, jeśli ich uruchomienie trwa dłużej niż limit czasu. Wdrożenie takiej bestii nie jest trudnym zadaniem, ale zastanawiam się, czy ktoś wie o istniejącej implementacji.
Oto, co wymyśliłem na podstawie części dyskusji poniżej. Jakieś uwagi?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
java
multithreading
concurrency
executorservice
Edward Dale
źródło
źródło
protected void beforeExecute(Thread t, Runnable r)
haka.Odpowiedzi:
W tym celu możesz użyć usługi ScheduledExecutorService . Najpierw przesłałbyś go tylko raz, aby rozpocząć natychmiast i zachować stworzoną przyszłość. Następnie możesz przesłać nowe zadanie, które po pewnym czasie anulowałoby zachowaną przyszłość.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS);
Spowoduje to wykonanie programu obsługi (główna funkcja, która ma zostać przerwana) przez 10 sekund, a następnie anuluje (tj. Przerwie) to konkretne zadanie.
źródło
beforeExecute
haka.Niestety rozwiązanie jest wadliwe. Występuje pewien błąd z
ScheduledThreadPoolExecutor
, również zgłaszany w tym pytaniu : anulowanie przesłanego zadania nie zwalnia w pełni zasobów pamięci związanych z zadaniem; zasoby są zwalniane dopiero po wygaśnięciu zadania.Jeśli zatem utworzysz plik
TimeoutThreadPoolExecutor
o dość długim czasie wygaśnięcia (typowe użycie) i prześlesz zadania wystarczająco szybko, w końcu zapełniasz pamięć - nawet jeśli zadania zostały faktycznie zakończone pomyślnie.Możesz zobaczyć problem z następującym (bardzo prymitywnym) programem testowym:
public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } }
Program wyczerpuje dostępną pamięć, chociaż czeka na zakończenie utworzonych
Runnable
s.Myślałem o tym przez chwilę, ale niestety nie mogłem wymyślić dobrego rozwiązania.
EDYCJA: Odkryłem, że ten problem został zgłoszony jako błąd JDK 6602600 i wydaje się, że został naprawiony bardzo niedawno.
źródło
Zawiń zadanie w FutureTask i możesz określić limit czasu dla FutureTask. Spójrz na przykład w mojej odpowiedzi na to pytanie,
java natywny limit czasu procesu
źródło
java.util.concurrent
klas można to zrobić na kilka sposobów , ale szukamExecutorService
implementacji.Po mnóstwie czasu na badanie,
wreszcie używam
invokeAll
metodyExecutorService
do rozwiązania tego problemu.Spowoduje to ścisłe przerwanie zadania podczas wykonywania zadania.
Oto przykład
ExecutorService executorService = Executors.newCachedThreadPool(); try { List<Callable<Object>> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (e.g. 2 sec) List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future<Object> future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown();
Zaletą jest to, że możesz również przesłać
ListenableFuture
w tym samymExecutorService
.Wystarczy nieznacznie zmienić pierwszą linię kodu.
ListeningExecutorService
to funkcja słuchania projektuExecutorService
w google guava ( com.google.guava ))źródło
invokeAll
. To działa bardzo dobrze. Tylko słowo ostrzeżenia dla każdego, kto myśli o użyciu tego: chociażinvokeAll
zwraca listęFuture
obiektów, w rzeczywistości wydaje się, że jest to operacja blokująca.Co powiesz na użycie
ExecutorService.shutDownNow()
metody opisanej w http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Wydaje się, że to najprostsze rozwiązanie.źródło
Wygląda na to, że problem nie tkwi w błędzie JDK 6602600 (został rozwiązany w 2010-05-22), ale w nieprawidłowym wywołaniu snu (10) w kółku. Należy dodać, że główny wątek musi dawać bezpośrednio SZANSĘ innym wątkom na realizację ich zadań poprzez wywołanie SLEEP (0) w KAŻDEJ gałęzi zewnętrznego koła. Myślę, że lepiej jest użyć Thread.yield () zamiast Thread.sleep (0)
Wynik poprawionej części poprzedniego kodu problemu jest taki:
....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // }
Działa poprawnie przy ilości licznika zewnętrznego do 150 000 000 badanych okręgów.
źródło
Korzystając z odpowiedzi Johna W. stworzyłem implementację, która poprawnie rozpoczyna przekroczenie limitu czasu, gdy zadanie rozpoczyna swoje wykonanie. Piszę nawet dla niego test jednostkowy :)
Jednak nie odpowiada to moim potrzebom, ponieważ niektóre operacje IO nie przerywają, gdy
Future.cancel()
jest wywoływana (tj. GdyThread.interrupt()
jest wywoływana). Niektóre przykłady operacji IO, które mogą nie zostać przerwane podczasThread.interrupt()
wywołania, toSocket.connect
iSocket.read
(i podejrzewam, że większość operacji IO zaimplementowano wjava.io
). Wszystkie operacje we / wyjava.nio
powinny być przerywane w momencieThread.interrupt()
wywołania. Na przykład tak jest w przypadkuSocketChannel.open
iSocketChannel.read
.W każdym razie, jeśli ktoś jest zainteresowany, stworzyłem sedno wykonawcy puli wątków, który pozwala zadaniom na przekroczenie limitu czasu (jeśli używają operacji przerywalnych ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
źródło
Socket.connect
iSocket.read
myThread.interrupted()
nie jest poprawną metodą przerywania, ponieważ CZYŚCI flagę przerwania. UżyjmyThread.interrupt()
zamiast tego, a to powinno z gniazdamiThread.interrupted()
nie pozwala na przerwanie wątku. JednakThread.interrupt()
nie przerywajava.io
operacji, działa tylko najava.nio
operacjach.interrupt()
od wielu lat i zawsze przerywało operacje java.io (a także inne metody blokowania, takie jak usypianie wątków, połączenia jdbc, blokowanie kolejki, itp.). Może znalazłeś klasę buggy lub jakąśA co z tym alternatywnym pomysłem:
Mała próbka jest tutaj:
public class AlternativeExecutorService { private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture<Void> future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture<Void> getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture<Void> future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable<Void> { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } }
źródło
sprawdź, czy to działa dla Ciebie,
public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor, int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection, Map<K,V> context, Task<T,S,K,V> someTask){ if(threadPoolExecutor==null){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build(); } if(someTask==null){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build(); } if(CollectionUtils.isEmpty(collection)){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build(); } LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size()); collection.forEach(value -> { callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything. }); LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>(); int count = 0; while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){ Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll()); futures.offer(f); count++; } Collection<ResponseObject<T>> responseCollection = new ArrayList<>(); while(futures.size()>0){ Future<T> future = futures.poll(); ResponseObject<T> responseObject = null; try { T response = future.get(timeToCompleteEachTask, timeUnit); responseObject = ResponseObject.<T>builder().data(response).build(); } catch (InterruptedException e) { future.cancel(true); } catch (ExecutionException e) { future.cancel(true); } catch (TimeoutException e) { future.cancel(true); } finally { if (Objects.nonNull(responseObject)) { responseCollection.add(responseObject); } futures.remove(future);//remove this Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue); if(null!=callable){ Future<T> f = threadPoolExecutor.submit(callable); futures.add(f); } } } return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build(); } private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){ if(callableLinkedBlockingQueue.size()>0){ return callableLinkedBlockingQueue.poll(); } return null; }
możesz ograniczyć liczbę używanych wątków z harmonogramu, a także ustawić limit czasu zadania.
źródło