Występuje problem ThreadPoolExecutor
polegający na RejectedExecutionException
tym, że jeśli próbuję zmienić rozmiar puli podstawowej na inną liczbę po utworzeniu puli, to sporadycznie niektóre zadania są odrzucane, chociaż nigdy nie przesyłam więcej niż queueSize + maxPoolSize
liczby zadań.
Problem, który próbuję rozwiązać, to rozszerzenie, ThreadPoolExecutor
które zmienia rozmiar swoich głównych wątków w oparciu o oczekujące wykonania znajdujące się w kolejce puli wątków. Potrzebuję tego, ponieważ domyślnie a ThreadPoolExecutor
utworzy nowy Thread
tylko wtedy, gdy kolejka jest pełna.
Oto mały samodzielny program Pure Java 8, który demonstruje problem.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Dlaczego pula powinna kiedykolwiek zgłaszać wyjątek RejectedExecutionException, jeśli nigdy nie przesyłam więcej niż kolejkaCapacity + maxThreads. Nigdy nie zmieniam maksymalnej liczby wątków, więc zgodnie z definicją ThreadPoolExecutor powinien on albo dostosować zadanie do wątku, albo do kolejki.
Oczywiście, jeśli nigdy nie zmieniam wielkości puli, to pula wątków nigdy nie odrzuca żadnych zgłoszeń. Jest to również trudne do debugowania, ponieważ dodanie jakichkolwiek opóźnień w zgłoszeniach sprawia, że problem znika.
Wszelkie wskazówki, jak naprawić RejectedExecutionException?
źródło
ExecutorService
, pakując istniejącą, która ponownie przesyła zadania, których przesłanie nie powiodło się z powodu zmiany rozmiaru?ThreadPoolExecutor
jest prawdopodobnie złym pomysłem i czy nie musisz zmieniać istniejącego kodu również w tym przypadku? Najlepiej byłoby podać przykład, w jaki sposób Twój kod uzyskuje dostęp do modułu wykonującego. Byłbym zaskoczony, gdyby używał wielu metod specyficznych dlaThreadPoolExecutor
(tj. Nie wExecutorService
).Odpowiedzi:
Oto scenariusz, dlaczego tak się dzieje:
W moim przykładzie używam minThreads = 0, maxThreads = 2 i queueCapacity = 2, aby go skrócić. Pierwsze polecenie zostanie przesłane, odbywa się to w metodzie execute:
dla tego polecenia wykonywane jest workQueue.offer (polecenie) niż addWorker (null, false). Wątek roboczy najpierw usuwa to polecenie z kolejki w metodzie uruchamiania wątków, więc w tej chwili kolejka nadal ma jedno polecenie,
Drugie polecenie zostanie przesłane tym razem zostanie wykonane workQueue.offer (polecenie). Teraz kolejka jest pełna
Teraz ScheduledExecutorService wykonuje metodę resizeThreadPool, która wywołuje setCorePoolSize z maxThreads. Oto metoda setCorePoolSize:
Ta metoda dodaje jednego pracownika za pomocą addWorker (null, true). Nie, uruchomione są 2 kolejki robocze, maksymalna i kolejka jest pełna.
Trzecie polecenie zostaje przesłane i kończy się niepowodzeniem, ponieważ nie działa workQueue.offer (polecenie) i addWorker (polecenie, fałsz), co prowadzi do wyjątku:
Myślę, że aby rozwiązać ten problem, powinieneś ustawić pojemność kolejki na maksymalną liczbę poleceń, które chcesz wykonać.
źródło
Nie jestem pewien, czy to kwalifikuje się jako błąd. Takie zachowanie powstaje, gdy dodatkowe wątki robocze są tworzone po zapełnieniu kolejki, ale w dokumentacji java zauważono, że osoba dzwoniąca musi poradzić sobie z zadaniami odrzuconymi.
Dokumenty Java
Gdy zmieniasz rozmiar puli podstawowej, powiedzmy, że wzrost, tworzone są dodatkowe procesy robocze (
addWorker
metoda wsetCorePoolSize
), a wywołanie tworzenia dodatkowej pracy (addWorker
metoda zexecute
) jest odrzucane, gdyaddWorker
zwraca false (add Worker
ostatni fragment kodu), ponieważ wystarczająca liczba dodatkowych pracowników jest już utworzony przez,setCorePoolSize
ale jeszcze nie uruchomiony, aby odzwierciedlić aktualizację w kolejce .Odpowiednie części
Porównać
Użyj niestandardowej procedury obsługi odrzucania ponownych prób (powinno to działać w twoim przypadku, ponieważ masz górną granicę jako maksymalny rozmiar puli). Dostosuj w razie potrzeby.
Pamiętaj również, że użycie zamykania jest nieprawidłowe, ponieważ nie będzie czekać na przesłanie zadania do wykonania, ale
awaitTermination
zamiast tego użyj .źródło