Od jakiegoś czasu byłem sfrustrowany domyślnym zachowaniem, ThreadPoolExecutor
które wspiera ExecutorService
pule wątków, których używa tak wielu z nas. Cytat z Javadocs:
Jeśli jest więcej uruchomionych wątków niż corePoolSize, ale mniej niż maximumPoolSize, nowy wątek zostanie utworzony tylko wtedy, gdy kolejka jest pełna .
Oznacza to, że jeśli zdefiniujesz pulę wątków za pomocą następującego kodu, nigdy nie uruchomi on drugiego wątku, ponieważ LinkedBlockingQueue
jest on nieograniczony.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Tylko jeśli masz ograniczoną kolejkę, a kolejka jest pełna, zostaną uruchomione wszystkie wątki powyżej numeru podstawowego. Podejrzewam, że duża liczba młodszych programistów wielowątkowych Java nie jest świadoma tego zachowania ThreadPoolExecutor
.
Teraz mam konkretny przypadek użycia, w którym nie jest to optymalne. Szukam sposobów obejścia tego bez pisania własnej klasy TPE.
Moje wymagania dotyczą usługi internetowej, która oddzwania do potencjalnie niewiarygodnej strony trzeciej.
- Nie chcę, aby wywołanie zwrotne było synchroniczne z żądaniem sieci Web, więc chcę użyć puli wątków.
- Zwykle dostaję kilka z nich na minutę, więc nie chcę mieć
newFixedThreadPool(...)
dużej liczby wątków, które w większości są uśpione. - Od czasu do czasu dostaję impuls tego ruchu i chcę zwiększyć liczbę wątków do jakiejś maksymalnej wartości (powiedzmy 50).
- Muszę dołożyć wszelkich starań, aby wykonać wszystkie wywołania zwrotne, więc chcę ustawić w kolejce wszystkie dodatkowe powyżej 50. Nie chcę przeciążać reszty mojego serwera internetowego za pomocą rozszerzenia
newCachedThreadPool()
.
Jak mogę obejść to ograniczenie, ThreadPoolExecutor
gdy kolejka musi być ograniczona i pełna, zanim zostanie uruchomionych więcej wątków? Jak mogę uruchomić więcej wątków przed kolejką zadań?
Edytować:
@Flavio ma dobry punkt widzenia na temat używania programu w ThreadPoolExecutor.allowCoreThreadTimeOut(true)
celu przekroczenia limitu czasu i zakończenia głównych wątków. Rozważałem to, ale nadal chciałem mieć funkcję rdzeni wątków. Nie chciałem, aby liczba wątków w puli spadła poniżej rozmiaru rdzenia, jeśli to możliwe.
Odpowiedzi:
Wydaje mi się, że w końcu znalazłem dość eleganckie (może trochę hakerskie) rozwiązanie tego ograniczenia za pomocą
ThreadPoolExecutor
. Polega ona na rozszerzenieLinkedBlockingQueue
mieć to powrótfalse
doqueue.offer(...)
gdy istnieją już pewne zadania w kolejce. Jeśli bieżące wątki nie nadążają za zadaniami w kolejce, TPE doda dodatkowe wątki. Jeśli pula ma już maksymalną liczbę wątków,RejectedExecutionHandler
zostanie wywołana. To program obsługujący wprowadza następnieput(...)
do kolejki.Z pewnością dziwne jest pisanie kolejki, do której
offer(...)
można wrócićfalse
iput()
nigdy nie blokować, więc to jest część hackowania. Ale działa to dobrze w przypadku korzystania z kolejki przez TPE, więc nie widzę żadnego problemu z robieniem tego.Oto kod:
Dzięki temu mechanizmowi, kiedy przesyłam zadania do kolejki,
ThreadPoolExecutor
wola:offer(...)
zwróci wartość false.RejectedExecutionHandler
RejectedExecutionHandler
odkłada zadanie w kolejce do przetworzenia przez pierwszego dostępnego wątku FIFO kolejności.Chociaż w moim przykładowym kodzie powyżej kolejka jest nieograniczona, możesz również zdefiniować ją jako ograniczoną kolejkę. Na przykład, jeśli dodasz pojemność 1000 do pojemności,
LinkedBlockingQueue
to:Ponadto, jeśli potrzebujesz użyć
offer(...)
w,RejectedExecutionHandler
możesz użyćoffer(E, long, TimeUnit)
metody zamiastLong.MAX_VALUE
limitu czasu.Ostrzeżenie:
Jeśli spodziewasz się, że zadania zostaną dodane do modułu wykonawczego po jego zamknięciu, możesz mądrzej wyrzucić
RejectedExecutionException
nasze niestandardowe ustawienia,RejectedExecutionHandler
gdy usługa wykonawcy została zamknięta. Dzięki @RaduToader za wskazanie tego.Edytować:
Inną poprawką tej odpowiedzi może być zapytanie TPE, czy istnieją bezczynne wątki i umieszczenie elementu w kolejce tylko wtedy, gdy tak jest. Musiałbyś stworzyć dla tego prawdziwą klasę i dodać do niej
ourQueue.setThreadPoolExecutor(tpe);
metodę.Wtedy Twoja
offer(...)
metoda może wyglądać następująco:tpe.getPoolSize() == tpe.getMaximumPoolSize()
w takim przypadku po prostu zadzwońsuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
zadzwoń,super.offer(...)
ponieważ wydają się być nieaktywne wątki.false
do rozwidlenia innego wątku.Może to:
Zauważ, że metody get w TPE są drogie, ponieważ uzyskują dostęp do
volatile
pól lub (w przypadkugetActiveCount()
) blokują TPE i przechodzą przez listę wątków. Istnieją również warunki wyścigu, które mogą spowodować nieprawidłowe umieszczenie zadania w kolejce lub rozwidlenie innego wątku, gdy był wątek bezczynny.źródło
Queue
w celu osiągnięcia tego, z pewnością nie jesteś sam w swoim pomyśle: groovy-programming.com/post/26923146865execute(runnable)
,runnable
zostanie po prostu dodany do kolejki. Jeśli zadzwoniszexecute(secondRunnable)
,secondRunnable
zostanie dodany do kolejki. Ale teraz, jeśli zadzwoniszexecute(thirdRunnable)
,thirdRunnable
zostanie uruchomiony w nowym wątku.runnable
IsecondRunnable
uruchomić tylko razthirdRunnable
(lub oryginał długotrwały zadania) są wykończone.Ustaw rozmiar rdzenia i maksymalny rozmiar na tę samą wartość i zezwól na usuwanie podstawowych wątków z puli za pomocą
allowCoreThreadTimeOut(true)
.źródło
Mam już dwie inne odpowiedzi na to pytanie, ale podejrzewam, że ta jest najlepsza.
Opiera się na technice aktualnie przyjętej odpowiedzi , a mianowicie:
offer()
metodę kolejki, aby (czasami) zwracała fałsz,ThreadPoolExecutor
odrodzenie nowego wątku lub odrzucenie zadania, aRejectedExecutionHandler
aby faktycznie kolejkować zadanie po odrzuceniu.Problem polega na tym, kiedy
offer()
powinien zwrócić false. Aktualnie zaakceptowana odpowiedź zwraca fałsz, gdy w kolejce jest kilka zadań, ale jak wskazałem w moim komentarzu, powoduje to niepożądane efekty. Alternatywnie, jeśli zawsze zwrócisz wartość false, będziesz nadal tworzyć nowe wątki, nawet jeśli masz wątki oczekujące w kolejce.Rozwiązaniem jest użycie Java 7
LinkedTransferQueue
ioffer()
wywołanietryTransfer()
. Gdy pojawi się oczekujący wątek konsumenta, zadanie zostanie po prostu przekazane do tego wątku. W przeciwnym razieoffer()
zwróci false iThreadPoolExecutor
utworzy nowy wątek.źródło
queue.offer()
, ponieważ faktycznie wywołujeLinkedTransferQueue.tryTransfer()
, zwróci wartość false i nie umieści zadania w kolejce. JednakRejectedExecutionHandler
wywołaniaqueue.put()
, które nie zawodzą i powodują umieszczenie zadania w kolejce.Uwaga: teraz wolę i polecam moją inną odpowiedź .
Oto wersja, która wydaje mi się znacznie prostsza: Zwiększ corePoolSize (do limitu maximumPoolSize) za każdym razem, gdy wykonywane jest nowe zadanie, a następnie zmniejsz corePoolSize (do limitu określonego przez użytkownika "core pool size") za każdym razem, gdy zadanie zakończone.
Innymi słowy, śledź liczbę uruchomionych lub umieszczonych w kolejce zadań i upewnij się, że corePoolSize jest równy liczbie zadań, o ile znajduje się między określonym przez użytkownika „core pool size” a maximumPoolSize.
Jak napisano, klasa nie obsługuje zmiany określonego przez użytkownika corePoolSize lub maximumPoolSize po skonstruowaniu i nie obsługuje manipulowania kolejką roboczą bezpośrednio lub za pośrednictwem
remove()
lubpurge()
.źródło
synchronized
bloków. Czy możesz zadzwonić do kolejki, aby uzyskać liczbę zadań. A może użyćAtomicInteger
?execute()
w osobnych wątkach jest kilka wywołań, każdy z nich (1) określi, ile wątków jest potrzebnych, (2)setCorePoolSize
do tego numeru i (3) zadzwonisuper.execute()
. Jeśli kroki (1) i (2) nie są zsynchronizowane, nie jestem pewien, jak zapobiec niefortunnemu uporządkowaniu, w którym ustawiasz rozmiar puli podstawowej na niższą liczbę po wyższej liczbie. Mając bezpośredni dostęp do pola nadklasy, można to zrobić za pomocą porównania i ustaw, ale nie widzę prostego sposobu na zrobienie tego w podklasie bez synchronizacji.taskCount
pole jest ważne (tjAtomicInteger
.). Jeśli dwa wątki przeliczają rozmiar puli bezpośrednio po sobie, powinny uzyskać prawidłowe wartości. Jeśli drugi zmniejsza wątki rdzenia, to musiał być zauważony spadek w kolejce czy coś.execute()
. Każdy zadzwoniatomicTaskCount.incrementAndGet()
i dostanie odpowiednio 10 i 11. Ale bez synchronizacji (ponad uzyskiwanie liczby zadań i ustawianie rozmiaru puli podstawowej) można wtedy uzyskać (1) zadanie 11 ustawia rozmiar puli rdzeniowej na 11, (2) zadanie 10 ustawia rozmiar puli podstawowej na 10, (3) wywołanie zadania 10super.execute()
, (4) wywołanie zadania 11super.execute()
i umieszczenie go w kolejce.Mamy podklasę,
ThreadPoolExecutor
która przyjmuje dodatkowecreationThreshold
i nadpisujeexecute
.może to też pomaga, ale Twój wygląda oczywiście bardziej artystycznie…
źródło
offer(...)
metoda zwracała się tylkofalse
warunkowo. Dzięki!Zalecana odpowiedź rozwiązuje tylko jeden (1) problem z pulą wątków JDK:
Pule wątków JDK są nastawione na kolejkowanie. Więc zamiast tworzyć nowy wątek, ustawią zadanie w kolejce. Tylko wtedy, gdy kolejka osiągnie swój limit, pula wątków utworzy nowy wątek.
Wycofywanie wątków nie następuje, gdy obciążenie zmniejsza się. Na przykład, jeśli mamy serię zadań uderzających w pulę, która powoduje, że pula osiąga maksimum, a następnie niewielkie obciążenie maksymalnie 2 zadań naraz, pula użyje wszystkich wątków do obsługi niewielkiego obciążenia, zapobiegając wycofaniu wątku. (potrzebne byłyby tylko 2 wątki…)
Niezadowolony z powyższego zachowania, poszedłem do przodu i wdrożyłem pulę, aby przezwyciężyć powyższe niedociągnięcia.
Aby rozwiązać 2) Korzystanie z planowania Lifo rozwiązuje problem. Idea ta została zaprezentowana przez Bena Maurera na konferencji ACM application 2015: Systems @ Facebook scale
Tak narodziła się nowa implementacja:
LifoThreadPoolExecutorSQP
Jak dotąd ta implementacja poprawia wydajność wykonywania asynchronicznego dla ZEL .
Implementacja jest w stanie zmniejszyć obciążenie przełączania kontekstu, zapewniając lepszą wydajność w niektórych przypadkach użycia.
Mam nadzieję, że to pomoże...
PS: JDK Fork Join Pool implementuje usługę ExecutorService i działa jako „normalna” pula wątków, implementacja jest wydajna, wykorzystuje harmonogramowanie wątków LIFO, jednak nie ma kontroli nad rozmiarem kolejki wewnętrznej, limitem czasu wycofania ..., a co najważniejsze zadania nie mogą być przerwane podczas ich anulowania
źródło
Uwaga: teraz wolę i polecam moją inną odpowiedź .
Mam inną propozycję, zgodnie z pierwotnym pomysłem zmiany kolejki, aby zwróciła fałsz. W tym przypadku wszystkie zadania mogą wejść do kolejki, ale za każdym razem, gdy zadanie jest kolejkowane po
execute()
, wykonujemy je z wartowniczym zadaniem no-op, które kolejka odrzuca, powodując pojawienie się nowego wątku, który natychmiast wykona no-op, a następnie coś z kolejki.Ponieważ wątki robocze mogą sondować
LinkedBlockingQueue
nowe zadanie, zadanie może zostać umieszczone w kolejce, nawet jeśli istnieje dostępny wątek. Aby uniknąć tworzenia nowych wątków, nawet jeśli są dostępne wątki, musimy śledzić, ile wątków czeka na nowe zadania w kolejce i tworzyć nowy wątek tylko wtedy, gdy w kolejce jest więcej zadań niż wątków oczekujących.źródło
Najlepszym rozwiązaniem, jakie przychodzi mi do głowy, jest przedłużenie.
ThreadPoolExecutor
oferuje kilka metod przechwytywania:beforeExecute
iafterExecute
. W swoim rozszerzeniu możesz używać ograniczonej kolejki do dostarczania zadań i drugiej nieograniczonej kolejki do obsługi przepełnienia. Gdy ktoś dzwonisubmit
, możesz spróbować umieścić żądanie w ograniczonej kolejce. Jeśli spotkasz się z wyjątkiem, po prostu umieść zadanie w swojej przepełnionej kolejce. Następnie możesz użyćafterExecute
haka, aby sprawdzić, czy po zakończeniu zadania jest coś w kolejce przepełnienia. W ten sposób executor zajmie się najpierw rzeczami w swojej ograniczonej kolejce i automatycznie pobierze z tej nieograniczonej kolejki, jak pozwoli na to czas.Wydaje się, że to więcej pracy niż twoje rozwiązanie, ale przynajmniej nie obejmuje nadawania kolejkom nieoczekiwanych zachowań. Wyobrażam sobie również, że istnieje lepszy sposób sprawdzania stanu kolejki i wątków, zamiast polegać na wyjątkach, które są dość powolne do wrzucania.
źródło
Uwaga: W przypadku JDK ThreadPoolExecutor, gdy masz ograniczoną kolejkę, tworzysz nowe wątki tylko wtedy, gdy oferta zwraca wartość false. Możesz uzyskać coś użytecznego dzięki CallerRunsPolicy, który tworzy trochę BackPressure i bezpośrednio wywołuje uruchamiane w wątku wywołującego.
Potrzebuję zadań do wykonania z wątków utworzonych przez pulę i mam niezasadną kolejkę do planowania, podczas gdy liczba wątków w puli może wzrosnąć lub zmniejszyć się między corePoolSize a maximumPoolSize, więc ...
Skończyło się na tym, że wykonałem pełną kopię wklejania z ThreadPoolExecutor i nieco zmieniłem metodę wykonywania, ponieważ niestety nie można tego zrobić przez rozszerzenie (wywołuje metody prywatne).
Nie chciałem odradzać nowych wątków tylko od razu, gdy nadejdzie nowe żądanie i wszystkie wątki są zajęte (ponieważ generalnie mam krótkotrwałe zadania). Dodałem próg, ale nie krępuj się go zmienić według swoich potrzeb (może dla większości IO lepiej usunąć ten próg)
źródło