Próbuję zakodować rozwiązanie, w którym pojedynczy wątek wytwarza zadania intensywnie korzystające z operacji we / wy, które można wykonywać równolegle. Każde zadanie ma znaczące dane w pamięci. Dlatego chcę mieć możliwość ograniczenia liczby zadań oczekujących w danej chwili.
Jeśli utworzę ThreadPoolExecutor w ten sposób:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Potem executor.submit(callable)
rzucaRejectedExecutionException
gdy kolejka się zapełni i wszystkie wątki są już zajęte.
Co mogę zrobić executor.submit(callable)
blok, gdy kolejka jest pełna i wszystkie wątki są zajęte?
EDYCJA : próbowałem tego :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
I w pewnym stopniu osiąga efekt, który chcę osiągnąć, ale w nieelegancki sposób (w zasadzie wątki odrzucone są uruchamiane w wątku wywołującym, więc blokuje to wątek wywołujący przed przesłaniem większej liczby).
EDYCJA: (5 lat po zadaniu pytania)
Każdemu, kto czyta to pytanie i odpowiedzi na nie, nie traktuj zaakceptowanej odpowiedzi jako jednego prawidłowego rozwiązania. Przeczytaj wszystkie odpowiedzi i komentarze.
źródło
numWorkerThreads
wtedy, gdy wątek wywołujący również wykonuje zadanie. Ale ważniejszą kwestią jest to, że jeśli wątek wywołujący otrzyma długotrwałe zadanie, inne wątki mogą stać bezczynne, czekając na następne zadanie.Odpowiedzi:
Zrobiłem to samo. Sztuczka polega na utworzeniu BlockingQueue, w którym metoda offer () jest tak naprawdę metodą put (). (możesz użyć dowolnej bazy BlockingQueue, którą chcesz).
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
Zauważ, że działa to tylko dla puli wątków, gdzie
corePoolSize==maxPoolSize
więc bądź ostrożny (patrz komentarze).źródło
corePoolSize==maxPoolSize
. Bez tego ThreadPoolExecutor nie ma już zaprojektowanego zachowania. Szukałem rozwiązania tego problemu, które nie miało tego ograniczenia; zobacz moją alternatywną odpowiedź poniżej dotyczącą podejścia, które ostatecznie przyjęliśmy.Oto, jak rozwiązałem ten problem:
(uwaga: to rozwiązanie blokuje wątek, który przesyła Callable, więc zapobiega wyrzucaniu wyjątku RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
źródło
corePoolSize < maxPoolSize
...: |corePoolSize < maxPoolSize
. W takich przypadkach semafor będzie dostępny, ale nie będzie wątku, a funkcjaSynchronousQueue
zwróci wartość false.ThreadPoolExecutor
Wtedy kręcić nowy wątek. Problem z tym rozwiązaniem polega na tym, że ma stan wyścigu . Posemaphore.release()
, ale przed zakończeniem wątkuexecute
, submit () otrzyma zezwolenie na semafor. Jeśli super.submit () zostanie uruchomiona przedexecute()
zakończeniem, zadanie zostanie odrzucone.Aktualnie akceptowana odpowiedź ma potencjalnie istotny problem - zmienia zachowanie ThreadPoolExecutor. Wykonaj tak, że jeśli masz
corePoolSize < maxPoolSize
, logika ThreadPoolExecutor nigdy nie doda dodatkowych pracowników poza rdzeniem.Z ThreadPoolExecutor .execute (Runnable):
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
W szczególności ten ostatni blok „else” nigdy nie zostanie trafiony.
Lepszą alternatywą jest zrobienie czegoś podobnego do tego, co już robi OP - użyj RejectedExecutionHandler, aby wykonać tę samą
put
logikę:public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }
Jest kilka rzeczy, na które należy zwrócić uwagę w przypadku tego podejścia, jak wskazano w komentarzach (odnosząc się do tej odpowiedzi ):
corePoolSize==0
, to występuje sytuacja wyścigu, w której wszystkie wątki w puli mogą umrzeć, zanim zadanie będzie widoczneThreadPoolExecutor
) spowoduje problemy, chyba że program obsługi również opakuje je w ten sam sposób.Mając na uwadze te problemy, to rozwiązanie będzie działać dla większości typowych ThreadPoolExecutors i będzie poprawnie obsługiwać przypadek, w którym
corePoolSize < maxPoolSize
.źródło
Wiem, że to stare pytanie, ale miałem podobny problem, że tworzenie nowych zadań było bardzo szybkie i jeśli wystąpiło zbyt wiele OutOfMemoryError, ponieważ istniejące zadanie nie zostało ukończone wystarczająco szybko.
W moim przypadku
Callables
są przesłane i potrzebuję wyniku, dlatego muszę przechowywać wszystkieFutures
zwrócone przezexecutor.submit()
. Moim rozwiązaniem było umieszczenieFutures
wBlockingQueue
maksymalnym rozmiarze. Po zapełnieniu kolejki zadania nie są generowane, dopóki niektóre nie zostaną zakończone (elementy usunięte z kolejki). W pseudokodzie:final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }
źródło
Miałem podobny problem i zaimplementowałem to używając
beforeExecute/afterExecute
hooków zThreadPoolExecutor
:import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
To powinno ci wystarczyć. Przy okazji, oryginalna implementacja była oparta na rozmiarze zadania, ponieważ jedno zadanie mogło być większe 100 razy niż inne, a przesłanie dwóch ogromnych zadań zabijało pudełko, ale uruchomienie jednego dużego i wielu małych było OK. Jeśli Twoje zadania intensywnie korzystające z operacji we / wy mają mniej więcej ten sam rozmiar, możesz użyć tej klasy, w przeciwnym razie po prostu daj mi znać, a opublikuję implementację opartą na rozmiarze.
PS Chciałbyś sprawdzić
ThreadPoolExecutor
javadoc. To naprawdę fajny podręcznik użytkownika autorstwa Douga Lea o tym, jak można go łatwo dostosować.źródło
maxTaskCount < currentTaskCount
i zacznie czekać podunpaused
warunkiem. W tym samym czasie inny wątek próbuje uzyskać blokadę w afterExecute (), aby zasygnalizować zakończenie zadania. Czy nie będzie to impas?RejectedExecutionException
jest to nadal możliwe.Zaimplementowałem rozwiązanie zgodne ze wzorcem dekoratora i wykorzystujące semafor do kontroli ilości wykonywanych zadań. Możesz go używać z dowolnym
Executor
i:RejectedExecutionException
zostanie wyrzucony a)import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try { this.delegate.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "'command' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }
źródło
Myślę, że jest to tak proste, jak użycie a
ArrayBlockingQueue
zamiast aaLinkedBlockingQueue
.Zignoruj mnie ... to całkowicie błędne.
ThreadPoolExecutor
połączeniaQueue#offer
nie takie,put
które dałyby oczekiwany efekt.Możesz rozszerzyć
ThreadPoolExecutor
i zapewnić implementacjęexecute(Runnable)
tych wywołańput
zamiastoffer
.Obawiam się, że to nie jest w pełni satysfakcjonująca odpowiedź.
źródło