Chcę stworzyć ThreadPoolExecutor
taki, który po osiągnięciu maksymalnego rozmiaru i zapełnieniu kolejki submit()
metoda blokuje się przy próbie dodania nowych zadań. Czy muszę zaimplementować w RejectedExecutionHandler
tym celu niestandardowy sposób , czy też istnieje istniejący sposób, aby to zrobić przy użyciu standardowej biblioteki Java?
java
concurrency
executor
Punkt stały
źródło
źródło
Odpowiedzi:
Jedno z możliwych rozwiązań, które właśnie znalazłem:
Czy są jakieś inne rozwiązania? Wolałbym coś opartego na czymś,
RejectedExecutionHandler
ponieważ wydaje się, że to standardowy sposób radzenia sobie w takich sytuacjach.źródło
throw e;
ma tego w książce. JCIP ma rację!Możesz użyć ThreadPoolExecutor i blockingQueue:
źródło
Należy użyć
CallerRunsPolicy
, który wykonuje odrzucone zadanie w wątku wywołującym. W ten sposób nie może przesyłać żadnych nowych zadań do modułu wykonawczego, dopóki to zadanie nie zostanie wykonane, w którym to momencie będą wolne wątki puli lub proces się powtórzy.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
Z dokumentów:
Upewnij się również, że podczas wywoływania
ThreadPoolExecutor
konstruktora używasz kolejki ograniczonej, takiej jak ArrayBlockingQueue . W przeciwnym razie nic nie zostanie odrzucone.Edycja: w odpowiedzi na swój komentarz ustaw rozmiar ArrayBlockingQueue na równy maksymalnemu rozmiarowi puli wątków i użyj AbortPolicy.
Edycja 2: OK, rozumiem, do czego zmierzasz. A co z tym: zastąp
beforeExecute()
metodę, aby sprawdzić,getActiveCount()
czy nie przekraczagetMaximumPoolSize()
, a jeśli tak, prześpij się i spróbuj ponownie?źródło
Hibernate ma
BlockPolicy
prostą i może robić, co chcesz:Zobacz: Executors.java
źródło
ThreadPoolExecutor
even mówi dosłownie: "Metoda getQueue () umożliwia dostęp do kolejki roboczej w celu monitorowania i debugowania. Używanie tej metody do jakichkolwiek innych celów jest zdecydowanie odradzane.". To, że jest to dostępne w tak szeroko znanej bibliotece, jest absolutnie smutne.BoundedExecutor
Odpowiedź cytowany powyżej z Java współbieżności w praktyce działa poprawnie tylko w przypadku korzystania z kolejki nieograniczony dla Wykonawcy lub semafor związany jest nie większy niż rozmiar kolejki. Semafor jest stanem współdzielonym między wątkiem wysyłającym i wątkami w puli, dzięki czemu możliwe jest nasycenie modułu wykonawczego, nawet jeśli rozmiar kolejki <bound <= (rozmiar kolejki + rozmiar puli).Używanie
CallerRunsPolicy
jest ważne tylko wtedy, gdy twoje zadania nie działają wiecznie, w takim przypadku twój wątek wysyłający pozostanie narejectedExecution
zawsze, a zły pomysł, jeśli twoje zadania będą działać długo, ponieważ wątek wysyłający nie może przesłać żadnych nowych zadań lub zrobić cokolwiek innego, jeśli sam uruchamia zadanie.Jeśli to nie do przyjęcia, sugeruję sprawdzenie rozmiaru ograniczonej kolejki executora przed przesłaniem zadania. Jeśli kolejka jest pełna, odczekaj chwilę, zanim ponownie spróbujesz przesłać. Przepustowość ucierpi, ale sugeruję, że jest to prostsze rozwiązanie niż wiele innych proponowanych rozwiązań i masz gwarancję, że żadne zadanie nie zostanie odrzucone.
źródło
Wiem, to hack, ale moim zdaniem najczystszy hack spośród oferowanych tutaj ;-)
Ponieważ ThreadPoolExecutor używa "oferta" kolejki blokującej zamiast "umieścić", pozwala na nadpisanie zachowania "oferty" kolejki blokującej:
Przetestowałem to i wydaje się, że działa. Wdrożenie pewnej polityki limitu czasu jest zadaniem czytelnika.
źródło
Następująca klasa otacza ThreadPoolExecutor i używa Semaphore do blokowania, a następnie kolejka robocza jest pełna:
Ta klasa opakowania jest oparta na rozwiązaniu przedstawionym w książce Java Concurrency in Practice autorstwa Briana Goetza. Rozwiązanie opisane w książce przyjmuje tylko dwa parametry konstruktora: an
Executor
i ograniczenie używane dla semafora. Pokazuje to odpowiedź udzielona przez Fixpoint. Jest problem z tym podejściem: może dojść do stanu, w którym wątki puli są zajęte, kolejka jest pełna, ale semafor właśnie wydał zezwolenie. (semaphore.release()
w ostatnim bloku). W tym stanie nowe zadanie może pobrać właśnie wydane zezwolenie, ale jest odrzucane, ponieważ kolejka zadań jest pełna. Oczywiście nie jest to coś, czego chcesz; chcesz zablokować w tym przypadku.Aby rozwiązać ten problem, musimy użyć nieograniczonej kolejki, o czym wyraźnie wspomina JCiP. Semafor działa jako strażnik, dając efekt wirtualnego rozmiaru kolejki. Ma to taki efekt uboczny, że jest możliwe, że jednostka może zawierać
maxPoolSize + virtualQueueSize + maxPoolSize
zadania. Dlaczego? Z powodusemaphore.release()
w ostatnim bloku. Jeśli wszystkie wątki puli wywołują tę instrukcję w tym samym czasie,maxPoolSize
zezwolenia są zwalniane, pozwalając tej samej liczbie zadań na wejście do jednostki. Gdybyśmy używali ograniczonej kolejki, nadal byłaby pełna, co spowodowałoby odrzucenie zadania. Teraz, ponieważ wiemy, że dzieje się tak tylko wtedy, gdy wątek puli jest prawie gotowy, nie stanowi to problemu. Wiemy, że wątek puli nie będzie się blokował, więc zadanie wkrótce zostanie pobrane z kolejki.Możesz jednak użyć ograniczonej kolejki. Tylko upewnij się, że jego rozmiar jest równy
virtualQueueSize + maxPoolSize
. Większe rozmiary są bezużyteczne, semafor uniemożliwi wpuszczenie większej liczby elementów. Mniejsze rozmiary spowodują odrzucenie zadań. Szansa na odrzucenie zadań rośnie wraz ze zmniejszaniem się rozmiaru. Na przykład, powiedzmy, że chcesz mieć ograniczony moduł wykonawczy z maxPoolSize = 2 i virtualQueueSize = 5. Następnie weź semafor z 5 + 2 = 7 zezwoleniami i rzeczywistym rozmiarem kolejki 5 + 2 = 7. Rzeczywista liczba zadań, które mogą znajdować się w jednostce, to 2 + 5 + 2 = 9. Kiedy moduł wykonawczy jest pełny (5 zadań w kolejce, 2 w puli wątków, więc 0 zezwoleń jest dostępnych) i WSZYSTKIE wątki puli zwalniają swoje zezwolenia, wtedy nadchodzące zadania mogą otrzymać dokładnie 2 zezwolenia.Teraz rozwiązanie JCiP jest nieco kłopotliwe w użyciu, ponieważ nie wymusza wszystkich tych ograniczeń (nieograniczonej kolejki lub ograniczeń matematycznych itp.). Myślę, że jest to tylko dobry przykład pokazujący, jak można budować nowe klasy bezpiecznych wątków w oparciu o części, które są już dostępne, ale nie jako w pełni rozwiniętą klasę wielokrotnego użytku. Nie sądzę, żeby to drugie było intencją autora.
źródło
możesz użyć niestandardowego RejectedExecutionHandler w ten sposób
źródło
Utwórz własną kolejkę blokującą, która będzie używana przez moduł wykonawczy, z zachowaniem blokującym, którego szukasz, zawsze zwracając dostępną pozostałą pojemność (upewniając się, że moduł wykonawczy nie będzie próbował utworzyć więcej wątków niż jego pula rdzeniowa ani nie uruchomi modułu obsługi odrzucania).
Wierzę, że w ten sposób uzyskasz zachowanie blokujące, którego szukasz. Program obsługi odrzucania nigdy nie pasuje do rachunku, ponieważ oznacza to, że wykonawca nie może wykonać zadania. Mogę sobie wyobrazić, że osoba prowadząca jest zajęta czekaniem. To nie jest to, czego chcesz, potrzebujesz kolejki dla executora, który blokuje wywołującego ...
źródło
ThreadPoolExecutor
używaoffer
metody dodawania zadań do kolejki. Gdybym utworzył niestandardowyBlockingQueue
blokujący sięoffer
, zerwałobyBlockingQueue
to kontrakt.ThreadPoolExecutor
został wdrożony do użycia,offer
a nieput
(wersja blokująca)? Ponadto, gdyby istniał sposób, aby kod klienta wskazywał, którego użyć i kiedy, wiele osób próbujących ręcznie wprowadzić niestandardowe rozwiązania byłoby odciążoneAby uniknąć problemów z rozwiązaniem @FixPoint. Można by użyć ListeningExecutorService i zwolnić semafor onSuccess i onFailure wewnątrz FutureCallback.
źródło
Runnable
ponieważ te metody są nadal wywoływane przed normalnym czyszczeniem pracownikaThreadPoolExecutor
. Oznacza to, że nadal będziesz musiał obsługiwać wyjątki odrzucenia.Ostatnio stwierdziłem, że to pytanie ma ten sam problem. OP nie mówi tego wprost, ale nie chcemy używać tego,
RejectedExecutionHandler
który wykonuje zadanie w wątku przesyłającego, ponieważ spowoduje to niedostateczne wykorzystanie wątków roboczych, jeśli to zadanie jest długotrwałe.Czytając wszystkie odpowiedzi i komentarze, w szczególności wadliwe rozwiązanie z semaforem lub używając do tego semafora,
afterExecute
przyjrzałem się bliżej kodowi ThreadPoolExecutor, aby sprawdzić, czy jest jakieś wyjście. Byłem zdumiony, widząc ponad 2000 linii (skomentowanego) kodu, z których niektóre przyprawiają mnie o zawrót głowy . Biorąc pod uwagę raczej proste wymaganie, które mam - jeden producent, kilku konsumentów, niech producent blokuje, gdy żaden konsument nie może podjąć pracy - postanowiłem stworzyć własne rozwiązanie. To nie jest,ExecutorService
ale tylko plikExecutor
. I nie dostosowuje liczby wątków do obciążenia pracą, ale utrzymuje tylko stałą liczbę wątków, co również odpowiada moim wymaganiom. Oto kod. Zapraszam do opowiadania o tym :-)źródło
Uważam, że istnieje dość elegancki sposób rozwiązania tego problemu poprzez użycie
java.util.concurrent.Semaphore
i delegowanie zachowaniaExecutor.newFixedThreadPool
. Nowa usługa wykonawcza wykona nowe zadanie tylko wtedy, gdy istnieje do tego wątek. Blokowaniem zarządza Semaphore z liczbą zezwoleń równą liczbie wątków. Po zakończeniu zadania zwraca zezwolenie.źródło
W przeszłości miałem tę samą potrzebę: rodzaj kolejki blokującej o stałym rozmiarze dla każdego klienta wspieranej przez wspólną pulę wątków. Skończyło się na tym, że napisałem własny ThreadPoolExecutor:
UserThreadPoolExecutor (kolejka blokująca (na klienta) + pula wątków (wspólna dla wszystkich klientów))
Zobacz: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Każdy UserThreadPoolExecutor otrzymuje maksymalną liczbę wątków z udostępnionego ThreadPoolExecutor
Każdy UserThreadPoolExecutor może:
źródło
Znalazłem tę zasadę odrzucania w kliencie wyszukiwania elastycznego. Blokuje wątek wywołujący w kolejce blokującej. Kod poniżej
źródło
Niedawno musiałem osiągnąć coś podobnego, ale na
ScheduledExecutorService
.Musiałem również upewnić się, że obsłużę opóźnienie przekazywane do metody i upewnij się, że zadanie zostanie przesłane do wykonania w czasie, którego oczekuje wywołujący, lub po prostu nie powiedzie się, rzucając w ten sposób plik
RejectedExecutionException
.Inne metody z
ScheduledThreadPoolExecutor
do wykonywania lub wysyłania wewnętrznego wywołania zadania,#schedule
które nadal będą wywoływać zastąpione metody.Mam tutaj kod, docenię wszelkie uwagi. https://github.com/AmitabhAwasthi/BlockingScheduler
źródło
Oto rozwiązanie, które wydaje się działać naprawdę dobrze. Nazywa się NotifyingBlockingThreadPoolExecutor .
Program demonstracyjny.
Edycja: wystąpił problem z tym kodem, metoda await () zawiera błędy. Wywołanie shutdown () + awaitTermination () wydaje się działać dobrze.
źródło
Nie zawsze podoba mi się CallerRunsPolicy, zwłaszcza, że pozwala ona odrzuconym zadaniom na „pominięcie kolejki” i wykonanie ich przed zadaniami, które zostały przesłane wcześniej. Ponadto wykonanie zadania w wątku wywołującym może zająć znacznie więcej czasu niż oczekiwanie na udostępnienie pierwszego gniazda.
Rozwiązałem ten problem za pomocą niestandardowego RejectedExecutionHandler, który po prostu blokuje wątek wywołujący na chwilę, a następnie próbuje ponownie przesłać zadanie:
Tej klasy można po prostu użyć w programie wykonawczym puli wątków jako RejectedExecutinHandler, jak każda inna, na przykład:
Jedynym minusem, jaki widzę, jest to, że wątek wywołujący może zostać zablokowany nieco dłużej niż jest to konieczne (do 250 ms). Co więcej, ponieważ ten wykonawca jest skutecznie wywoływany rekurencyjnie, bardzo długie oczekiwanie na udostępnienie wątku (godziny) może spowodować przepełnienie stosu.
Niemniej jednak osobiście podoba mi się ta metoda. Jest kompaktowy, łatwy do zrozumienia i dobrze działa.
źródło