Jak sprawić, by ThreadPoolExecutor zwiększył liczbę wątków do maksimum przed umieszczeniem w kolejce?

100

Od jakiegoś czasu byłem sfrustrowany domyślnym zachowaniem, ThreadPoolExecutorktóre wspiera ExecutorServicepule wątków, których używa tak wielu z nas. Cytat z Javadocs:

Jeśli jest więcej uruchomionych wątków niż corePoolSize, ale mniej niż maximumPoolSize, nowy wątek zostanie utworzony tylko wtedy, gdy kolejka jest pełna .

Oznacza to, że jeśli zdefiniujesz pulę wątków za pomocą następującego kodu, nigdy nie uruchomi on drugiego wątku, ponieważ LinkedBlockingQueuejest on nieograniczony.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Tylko jeśli masz ograniczoną kolejkę, a kolejka jest pełna, zostaną uruchomione wszystkie wątki powyżej numeru podstawowego. Podejrzewam, że duża liczba młodszych programistów wielowątkowych Java nie jest świadoma tego zachowania ThreadPoolExecutor.

Teraz mam konkretny przypadek użycia, w którym nie jest to optymalne. Szukam sposobów obejścia tego bez pisania własnej klasy TPE.

Moje wymagania dotyczą usługi internetowej, która oddzwania do potencjalnie niewiarygodnej strony trzeciej.

  • Nie chcę, aby wywołanie zwrotne było synchroniczne z żądaniem sieci Web, więc chcę użyć puli wątków.
  • Zwykle dostaję kilka z nich na minutę, więc nie chcę mieć newFixedThreadPool(...)dużej liczby wątków, które w większości są uśpione.
  • Od czasu do czasu dostaję impuls tego ruchu i chcę zwiększyć liczbę wątków do jakiejś maksymalnej wartości (powiedzmy 50).
  • Muszę dołożyć wszelkich starań, aby wykonać wszystkie wywołania zwrotne, więc chcę ustawić w kolejce wszystkie dodatkowe powyżej 50. Nie chcę przeciążać reszty mojego serwera internetowego za pomocą rozszerzenia newCachedThreadPool().

Jak mogę obejść to ograniczenie, ThreadPoolExecutorgdy kolejka musi być ograniczona i pełna, zanim zostanie uruchomionych więcej wątków? Jak mogę uruchomić więcej wątków przed kolejką zadań?

Edytować:

@Flavio ma dobry punkt widzenia na temat używania programu w ThreadPoolExecutor.allowCoreThreadTimeOut(true)celu przekroczenia limitu czasu i zakończenia głównych wątków. Rozważałem to, ale nadal chciałem mieć funkcję rdzeni wątków. Nie chciałem, aby liczba wątków w puli spadła poniżej rozmiaru rdzenia, jeśli to możliwe.

Szary
źródło
1
Biorąc pod uwagę, że Twój przykład tworzy maksymalnie 10 wątków, czy są jakieś rzeczywiste oszczędności w korzystaniu z czegoś, co rośnie / kurczy się w puli wątków o stałym rozmiarze?
bstempi
Słuszna uwaga @bstempi. Liczba była nieco arbitralna. Zwiększyłem go w pytaniu do 50. Nie wiem, ile dokładnie współbieżnych wątków chcę teraz pracować, skoro mam to rozwiązanie.
Gray
1
O cholera! 10 głosów za, gdybym mógł tutaj, dokładnie w tej samej pozycji, na której jestem.
Eugene

Odpowiedzi:

51

Jak mogę obejść to ograniczenie w sytuacji, ThreadPoolExecutorgdy kolejka musi być ograniczona i zapełniona, zanim zostanie uruchomionych więcej wątków.

Wydaje mi się, że w końcu znalazłem dość eleganckie (może trochę hakerskie) rozwiązanie tego ograniczenia za pomocą ThreadPoolExecutor. Polega ona na rozszerzenie LinkedBlockingQueuemieć to powrót falsedo queue.offer(...)gdy istnieją już pewne zadania w kolejce. Jeśli bieżące wątki nie nadążają za zadaniami w kolejce, TPE doda dodatkowe wątki. Jeśli pula ma już maksymalną liczbę wątków, RejectedExecutionHandlerzostanie wywołana. To program obsługujący wprowadza następnie put(...)do kolejki.

Z pewnością dziwne jest pisanie kolejki, do której offer(...)można wrócić falsei put()nigdy nie blokować, więc to jest część hackowania. Ale działa to dobrze w przypadku korzystania z kolejki przez TPE, więc nie widzę żadnego problemu z robieniem tego.

Oto kod:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Dzięki temu mechanizmowi, kiedy przesyłam zadania do kolejki, ThreadPoolExecutorwola:

  1. Początkowo skaluj liczbę wątków do rozmiaru rdzenia (tutaj 1).
  2. Zaproponuj to w kolejce. Jeśli kolejka jest pusta, zostanie umieszczona w kolejce do obsługi przez istniejące wątki.
  3. Jeśli kolejka ma już 1 lub więcej elementów, offer(...)zwróci wartość false.
  4. Jeśli zwracana jest wartość false, zwiększaj liczbę wątków w puli, aż osiągną maksymalną liczbę (tutaj 50).
  5. Jeśli na maksimum, wywołuje RejectedExecutionHandler
  6. W RejectedExecutionHandlerodkłada zadanie w kolejce do przetworzenia przez pierwszego dostępnego wątku FIFO kolejności.

Chociaż w moim przykładowym kodzie powyżej kolejka jest nieograniczona, możesz również zdefiniować ją jako ograniczoną kolejkę. Na przykład, jeśli dodasz pojemność 1000 do pojemności, LinkedBlockingQueueto:

  1. skalowanie gwintów do max
  2. następnie ustaw w kolejce, aż wypełni się 1000 zadań
  3. następnie blokuj dzwoniącego, aż kolejka będzie miała wolne miejsce.

Ponadto, jeśli potrzebujesz użyć offer(...)w, RejectedExecutionHandlermożesz użyć offer(E, long, TimeUnit)metody zamiast Long.MAX_VALUElimitu czasu.

Ostrzeżenie:

Jeśli spodziewasz się, że zadania zostaną dodane do modułu wykonawczego po jego zamknięciu, możesz mądrzej wyrzucić RejectedExecutionExceptionnasze niestandardowe ustawienia, RejectedExecutionHandlergdy usługa wykonawcy została zamknięta. Dzięki @RaduToader za wskazanie tego.

Edytować:

Inną poprawką tej odpowiedzi może być zapytanie TPE, czy istnieją bezczynne wątki i umieszczenie elementu w kolejce tylko wtedy, gdy tak jest. Musiałbyś stworzyć dla tego prawdziwą klasę i dodać do niej ourQueue.setThreadPoolExecutor(tpe);metodę.

Wtedy Twoja offer(...)metoda może wyglądać następująco:

  1. Sprawdź, czy tpe.getPoolSize() == tpe.getMaximumPoolSize()w takim przypadku po prostu zadzwoń super.offer(...).
  2. W przeciwnym razie tpe.getPoolSize() > tpe.getActiveCount()zadzwoń, super.offer(...)ponieważ wydają się być nieaktywne wątki.
  3. W przeciwnym razie wróć falsedo rozwidlenia innego wątku.

Może to:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Zauważ, że metody get w TPE są drogie, ponieważ uzyskują dostęp do volatilepól lub (w przypadku getActiveCount()) blokują TPE i przechodzą przez listę wątków. Istnieją również warunki wyścigu, które mogą spowodować nieprawidłowe umieszczenie zadania w kolejce lub rozwidlenie innego wątku, gdy był wątek bezczynny.

Szary
źródło
Ja też zmagałem się z tym samym problemem, skończyło się na nadpisywaniu metody wykonania. Ale to naprawdę fajne rozwiązanie. :)
Batty
O ile nie podoba mi się pomysł zerwania umowy Queuew celu osiągnięcia tego, z pewnością nie jesteś sam w swoim pomyśle: groovy-programming.com/post/26923146865
bstempi
3
Czy nie ma tu dziwności, że kilka pierwszych zadań zostanie umieszczonych w kolejce i dopiero po pojawieniu się nowych wątków? Na przykład, jeśli jeden rdzeń wątku jest zajęty jednym długotrwałym zadaniem i dzwonisz execute(runnable), runnablezostanie po prostu dodany do kolejki. Jeśli zadzwonisz execute(secondRunnable), secondRunnablezostanie dodany do kolejki. Ale teraz, jeśli zadzwonisz execute(thirdRunnable), thirdRunnablezostanie uruchomiony w nowym wątku. runnableI secondRunnableuruchomić tylko raz thirdRunnable(lub oryginał długotrwały zadania) są wykończone.
Robert Tupelo-Schneck
1
Tak, Robert ma rację, w wysoce wielowątkowym środowisku kolejka czasami rośnie, podczas gdy są wolne wątki do użycia. Rozwiązanie poniżej, które rozszerza TPE - działa znacznie lepiej. Myślę, że sugestię Roberta należy oznaczyć jako odpowiedź, mimo że powyższy hack jest interesujący
Wanna Know All
1
„RejectedExecutionHandler” pomógł executorowi podczas zamykania. Teraz jesteś zmuszony użyć shutdownNow (), ponieważ shutdown () nie zapobiega dodawaniu nowych zadań (z powodu wymagania)
Radu Toader,
28

Ustaw rozmiar rdzenia i maksymalny rozmiar na tę samą wartość i zezwól na usuwanie podstawowych wątków z puli za pomocą allowCoreThreadTimeOut(true).

Flavio
źródło
+1 Tak, myślałem o tym, ale nadal chciałem mieć funkcję rdzeni wątków. Nie chciałem, aby pula wątków przechodziła do 0 wątków w okresach uśpienia. Zmienię moje pytanie, aby to wskazać. Ale doskonała uwaga.
Gray
Dziękuję Ci! To po prostu najprostszy sposób.
Dmitry Ovchinnikov
28

Mam już dwie inne odpowiedzi na to pytanie, ale podejrzewam, że ta jest najlepsza.

Opiera się na technice aktualnie przyjętej odpowiedzi , a mianowicie:

  1. Zastąp offer()metodę kolejki, aby (czasami) zwracała fałsz,
  2. co powoduje ThreadPoolExecutorodrodzenie nowego wątku lub odrzucenie zadania, a
  3. ustaw, RejectedExecutionHandleraby faktycznie kolejkować zadanie po odrzuceniu.

Problem polega na tym, kiedy offer()powinien zwrócić false. Aktualnie zaakceptowana odpowiedź zwraca fałsz, gdy w kolejce jest kilka zadań, ale jak wskazałem w moim komentarzu, powoduje to niepożądane efekty. Alternatywnie, jeśli zawsze zwrócisz wartość false, będziesz nadal tworzyć nowe wątki, nawet jeśli masz wątki oczekujące w kolejce.

Rozwiązaniem jest użycie Java 7 LinkedTransferQueuei offer()wywołanie tryTransfer(). Gdy pojawi się oczekujący wątek konsumenta, zadanie zostanie po prostu przekazane do tego wątku. W przeciwnym razie offer()zwróci false i ThreadPoolExecutorutworzy nowy wątek.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Robert Tupelo-Schneck
źródło
Muszę się zgodzić, wydaje mi się, że to najczystsze. Jedynym minusem tego rozwiązania jest to, że LinkedTransferQueue jest nieograniczony, więc nie otrzymasz kolejki zadań o ograniczonej pojemności bez dodatkowej pracy.
Yeroc,
Występuje problem, gdy pula osiąga maksymalny rozmiar. Powiedz, że pula została przeskalowana w górę do maksymalnego rozmiaru i każdy wątek aktualnie wykonuje zadanie, gdy runnable złożył tę ofertę, impl zwróci false, a ThreadPoolExecutor spróbuje dodać wątek addWorker, ale pula już osiągnęła maksimum, więc runnable zostanie po prostu odrzucone. Zgodnie z odrzuconymExceHandler, który napisałeś, zostanie ponownie zaoferowany w kolejce, co spowoduje, że ten taniec małpy będzie się powtarzał od początku.
Sudheera
1
@Sudheera Uważam, że się mylisz. queue.offer(), ponieważ faktycznie wywołuje LinkedTransferQueue.tryTransfer(), zwróci wartość false i nie umieści zadania w kolejce. Jednak RejectedExecutionHandlerwywołania queue.put(), które nie zawodzą i powodują umieszczenie zadania w kolejce.
Robert Tupelo-Schneck
1
@ RobertTupelo-Schneck niezwykle przydatne i przyjemne!
Eugene
1
@ RobertTupelo-Schneck Działa jak marzenie! Nie wiem, dlaczego w Javie nie ma czegoś takiego po wyjęciu z pudełka
Georgi Peev
7

Uwaga: teraz wolę i polecam moją inną odpowiedź .

Oto wersja, która wydaje mi się znacznie prostsza: Zwiększ corePoolSize (do limitu maximumPoolSize) za każdym razem, gdy wykonywane jest nowe zadanie, a następnie zmniejsz corePoolSize (do limitu określonego przez użytkownika "core pool size") za każdym razem, gdy zadanie zakończone.

Innymi słowy, śledź liczbę uruchomionych lub umieszczonych w kolejce zadań i upewnij się, że corePoolSize jest równy liczbie zadań, o ile znajduje się między określonym przez użytkownika „core pool size” a maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Jak napisano, klasa nie obsługuje zmiany określonego przez użytkownika corePoolSize lub maximumPoolSize po skonstruowaniu i nie obsługuje manipulowania kolejką roboczą bezpośrednio lub za pośrednictwem remove()lub purge().

Robert Tupelo-Schneck
źródło
Podoba mi się z wyjątkiem synchronizedbloków. Czy możesz zadzwonić do kolejki, aby uzyskać liczbę zadań. A może użyć AtomicInteger?
Gray
Chciałem ich uniknąć, ale problem jest taki. Jeśli execute()w osobnych wątkach jest kilka wywołań, każdy z nich (1) określi, ile wątków jest potrzebnych, (2) setCorePoolSizedo tego numeru i (3) zadzwoni super.execute(). Jeśli kroki (1) i (2) nie są zsynchronizowane, nie jestem pewien, jak zapobiec niefortunnemu uporządkowaniu, w którym ustawiasz rozmiar puli podstawowej na niższą liczbę po wyższej liczbie. Mając bezpośredni dostęp do pola nadklasy, można to zrobić za pomocą porównania i ustaw, ale nie widzę prostego sposobu na zrobienie tego w podklasie bez synchronizacji.
Robert Tupelo-Schneck
Myślę, że kary za ten stan wyścigu są stosunkowo niskie, o ile taskCountpole jest ważne (tj AtomicInteger.). Jeśli dwa wątki przeliczają rozmiar puli bezpośrednio po sobie, powinny uzyskać prawidłowe wartości. Jeśli drugi zmniejsza wątki rdzenia, to musiał być zauważony spadek w kolejce czy coś.
Gray
1
Niestety myślę, że to jest gorsze. Załóżmy, że wywołanie zadań 10 i 11 execute(). Każdy zadzwoni atomicTaskCount.incrementAndGet()i dostanie odpowiednio 10 i 11. Ale bez synchronizacji (ponad uzyskiwanie liczby zadań i ustawianie rozmiaru puli podstawowej) można wtedy uzyskać (1) zadanie 11 ustawia rozmiar puli rdzeniowej na 11, (2) zadanie 10 ustawia rozmiar puli podstawowej na 10, (3) wywołanie zadania 10 super.execute(), (4) wywołanie zadania 11 super.execute()i umieszczenie go w kolejce.
Robert Tupelo-Schneck
2
Przetestowałem to rozwiązanie i jest zdecydowanie najlepsze. W środowisku wielowątkowym nadal czasami umieszcza się w kolejce, gdy są wolne wątki (ze względu na naturę wolnego wątku TPE.execute), ale zdarza się to rzadko, w przeciwieństwie do rozwiązania oznaczonego jako odpowiedź, gdzie stan wyścigu ma większe szanse zdarza się, więc dzieje się to prawie przy każdym przebiegu wielowątkowym.
Wanna Know All
6

Mamy podklasę, ThreadPoolExecutorktóra przyjmuje dodatkowe creationThresholdi nadpisuje execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

może to też pomaga, ale Twój wygląda oczywiście bardziej artystycznie…

Ralf H.
źródło
Ciekawy. Dzięki za to. Właściwie nie wiedziałem, że rozmiar rdzenia jest zmienny.
Gray
Teraz, kiedy już o tym myślę, to rozwiązanie jest lepsze od mojego pod względem sprawdzania wielkości kolejki. Poprawiłem odpowiedź, aby offer(...)metoda zwracała się tylko falsewarunkowo. Dzięki!
Gray
4

Zalecana odpowiedź rozwiązuje tylko jeden (1) problem z pulą wątków JDK:

  1. Pule wątków JDK są nastawione na kolejkowanie. Więc zamiast tworzyć nowy wątek, ustawią zadanie w kolejce. Tylko wtedy, gdy kolejka osiągnie swój limit, pula wątków utworzy nowy wątek.

  2. Wycofywanie wątków nie następuje, gdy obciążenie zmniejsza się. Na przykład, jeśli mamy serię zadań uderzających w pulę, która powoduje, że pula osiąga maksimum, a następnie niewielkie obciążenie maksymalnie 2 zadań naraz, pula użyje wszystkich wątków do obsługi niewielkiego obciążenia, zapobiegając wycofaniu wątku. (potrzebne byłyby tylko 2 wątki…)

Niezadowolony z powyższego zachowania, poszedłem do przodu i wdrożyłem pulę, aby przezwyciężyć powyższe niedociągnięcia.

Aby rozwiązać 2) Korzystanie z planowania Lifo rozwiązuje problem. Idea ta została zaprezentowana przez Bena Maurera na konferencji ACM application 2015: Systems @ Facebook scale

Tak narodziła się nowa implementacja:

LifoThreadPoolExecutorSQP

Jak dotąd ta implementacja poprawia wydajność wykonywania asynchronicznego dla ZEL .

Implementacja jest w stanie zmniejszyć obciążenie przełączania kontekstu, zapewniając lepszą wydajność w niektórych przypadkach użycia.

Mam nadzieję, że to pomoże...

PS: JDK Fork Join Pool implementuje usługę ExecutorService i działa jako „normalna” pula wątków, implementacja jest wydajna, wykorzystuje harmonogramowanie wątków LIFO, jednak nie ma kontroli nad rozmiarem kolejki wewnętrznej, limitem czasu wycofania ..., a co najważniejsze zadania nie mogą być przerwane podczas ich anulowania

user2179737
źródło
1
Szkoda, że ​​ta implementacja ma tak wiele zewnętrznych zależności. Czyni to dla mnie bezużyteczne: - /
Martin L.
1
To naprawdę dobra uwaga (druga). Niestety, implementacja nie jest jasna z zależności zewnętrznych, ale nadal można ją zastosować, jeśli chcesz.
Alexey Vlasov
1

Uwaga: teraz wolę i polecam moją inną odpowiedź .

Mam inną propozycję, zgodnie z pierwotnym pomysłem zmiany kolejki, aby zwróciła fałsz. W tym przypadku wszystkie zadania mogą wejść do kolejki, ale za każdym razem, gdy zadanie jest kolejkowane po execute(), wykonujemy je z wartowniczym zadaniem no-op, które kolejka odrzuca, powodując pojawienie się nowego wątku, który natychmiast wykona no-op, a następnie coś z kolejki.

Ponieważ wątki robocze mogą sondować LinkedBlockingQueuenowe zadanie, zadanie może zostać umieszczone w kolejce, nawet jeśli istnieje dostępny wątek. Aby uniknąć tworzenia nowych wątków, nawet jeśli są dostępne wątki, musimy śledzić, ile wątków czeka na nowe zadania w kolejce i tworzyć nowy wątek tylko wtedy, gdy w kolejce jest więcej zadań niż wątków oczekujących.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
Robert Tupelo-Schneck
źródło
0

Najlepszym rozwiązaniem, jakie przychodzi mi do głowy, jest przedłużenie.

ThreadPoolExecutoroferuje kilka metod przechwytywania: beforeExecutei afterExecute. W swoim rozszerzeniu możesz używać ograniczonej kolejki do dostarczania zadań i drugiej nieograniczonej kolejki do obsługi przepełnienia. Gdy ktoś dzwoni submit, możesz spróbować umieścić żądanie w ograniczonej kolejce. Jeśli spotkasz się z wyjątkiem, po prostu umieść zadanie w swojej przepełnionej kolejce. Następnie możesz użyć afterExecutehaka, aby sprawdzić, czy po zakończeniu zadania jest coś w kolejce przepełnienia. W ten sposób executor zajmie się najpierw rzeczami w swojej ograniczonej kolejce i automatycznie pobierze z tej nieograniczonej kolejki, jak pozwoli na to czas.

Wydaje się, że to więcej pracy niż twoje rozwiązanie, ale przynajmniej nie obejmuje nadawania kolejkom nieoczekiwanych zachowań. Wyobrażam sobie również, że istnieje lepszy sposób sprawdzania stanu kolejki i wątków, zamiast polegać na wyjątkach, które są dość powolne do wrzucania.

bstempi
źródło
Nie przepadam za tym rozwiązaniem. Jestem prawie pewien, że ThreadPoolExecutor nie został zaprojektowany do dziedziczenia.
scottb
Właściwie jest przykład rozszerzenia bezpośrednio w JavaDoc. Stwierdzają, że większość prawdopodobnie po prostu zaimplementuje metody przechwytujące, ale mówią ci, na co jeszcze musisz zwrócić uwagę podczas rozszerzania.
bstempi
0

Uwaga: W przypadku JDK ThreadPoolExecutor, gdy masz ograniczoną kolejkę, tworzysz nowe wątki tylko wtedy, gdy oferta zwraca wartość false. Możesz uzyskać coś użytecznego dzięki CallerRunsPolicy, który tworzy trochę BackPressure i bezpośrednio wywołuje uruchamiane w wątku wywołującego.

Potrzebuję zadań do wykonania z wątków utworzonych przez pulę i mam niezasadną kolejkę do planowania, podczas gdy liczba wątków w puli może wzrosnąć lub zmniejszyć się między corePoolSize a maximumPoolSize, więc ...

Skończyło się na tym, że wykonałem pełną kopię wklejania z ThreadPoolExecutor i nieco zmieniłem metodę wykonywania, ponieważ niestety nie można tego zrobić przez rozszerzenie (wywołuje metody prywatne).

Nie chciałem odradzać nowych wątków tylko od razu, gdy nadejdzie nowe żądanie i wszystkie wątki są zajęte (ponieważ generalnie mam krótkotrwałe zadania). Dodałem próg, ale nie krępuj się go zmienić według swoich potrzeb (może dla większości IO lepiej usunąć ten próg)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
Radu Toader
źródło