Java ThreadPoolExecutor: Aktualizacja wielkości puli podstawowej dynamicznie odrzuca przychodzące zadania sporadycznie

13

Występuje problem ThreadPoolExecutorpolegający na RejectedExecutionExceptiontym, że jeśli próbuję zmienić rozmiar puli podstawowej na inną liczbę po utworzeniu puli, to sporadycznie niektóre zadania są odrzucane, chociaż nigdy nie przesyłam więcej niż queueSize + maxPoolSizeliczby zadań.

Problem, który próbuję rozwiązać, to rozszerzenie, ThreadPoolExecutorktóre zmienia rozmiar swoich głównych wątków w oparciu o oczekujące wykonania znajdujące się w kolejce puli wątków. Potrzebuję tego, ponieważ domyślnie a ThreadPoolExecutorutworzy nowy Threadtylko wtedy, gdy kolejka jest pełna.

Oto mały samodzielny program Pure Java 8, który demonstruje problem.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Dlaczego pula powinna kiedykolwiek zgłaszać wyjątek RejectedExecutionException, jeśli nigdy nie przesyłam więcej niż kolejkaCapacity + maxThreads. Nigdy nie zmieniam maksymalnej liczby wątków, więc zgodnie z definicją ThreadPoolExecutor powinien on albo dostosować zadanie do wątku, albo do kolejki.

Oczywiście, jeśli nigdy nie zmieniam wielkości puli, to pula wątków nigdy nie odrzuca żadnych zgłoszeń. Jest to również trudne do debugowania, ponieważ dodanie jakichkolwiek opóźnień w zgłoszeniach sprawia, że ​​problem znika.

Wszelkie wskazówki, jak naprawić RejectedExecutionException?

Swaranga Sarma
źródło
Dlaczego nie zapewnić własnej implementacji ExecutorService, pakując istniejącą, która ponownie przesyła zadania, których przesłanie nie powiodło się z powodu zmiany rozmiaru?
daniu
@daniu to obejście. Chodzi o to, dlaczego pula powinna kiedykolwiek zgłaszać wyjątek RejectedExecutionException, jeśli nigdy nie podam więcej niż queueCapacity + maxThreads. Nigdy nie zmieniam maksymalnej liczby wątków, więc zgodnie z definicją ThreadPoolExecutor powinien on albo dostosować zadanie do wątku, albo do kolejki.
Swaranga Sarma
Ok, chyba źle zrozumiałem twoje pytanie. Co to jest? Czy chcesz wiedzieć, dlaczego takie zachowanie ma miejsce lub jak sobie z tym poradzić, powodując problemy?
daniu
Tak, zmiana mojej implementacji na usługę executora nie jest możliwa, ponieważ wiele kodu odnosi się do ThreadPoolExecutor. Więc jeśli nadal chciałbym mieć ThreadPoolExecutor o zmiennym rozmiarze, muszę wiedzieć, jak to naprawić. Może właściwym sposobem na zrobienie czegoś takiego jest rozszerzenie ThreadPoolExecutor i uzyskanie dostępu do niektórych jego chronionych zmiennych oraz aktualizacja wielkości puli w zsynchronizowanym bloku na zamku współdzielonym przez superklasę.
Swaranga Sarma
Rozszerzanie ThreadPoolExecutorjest prawdopodobnie złym pomysłem i czy nie musisz zmieniać istniejącego kodu również w tym przypadku? Najlepiej byłoby podać przykład, w jaki sposób Twój kod uzyskuje dostęp do modułu wykonującego. Byłbym zaskoczony, gdyby używał wielu metod specyficznych dla ThreadPoolExecutor(tj. Nie w ExecutorService).
daniu

Odpowiedzi:

5

Oto scenariusz, dlaczego tak się dzieje:

W moim przykładzie używam minThreads = 0, maxThreads = 2 i queueCapacity = 2, aby go skrócić. Pierwsze polecenie zostanie przesłane, odbywa się to w metodzie execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

dla tego polecenia wykonywane jest workQueue.offer (polecenie) niż addWorker (null, false). Wątek roboczy najpierw usuwa to polecenie z kolejki w metodzie uruchamiania wątków, więc w tej chwili kolejka nadal ma jedno polecenie,

Drugie polecenie zostanie przesłane tym razem zostanie wykonane workQueue.offer (polecenie). Teraz kolejka jest pełna

Teraz ScheduledExecutorService wykonuje metodę resizeThreadPool, która wywołuje setCorePoolSize z maxThreads. Oto metoda setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Ta metoda dodaje jednego pracownika za pomocą addWorker (null, true). Nie, uruchomione są 2 kolejki robocze, maksymalna i kolejka jest pełna.

Trzecie polecenie zostaje przesłane i kończy się niepowodzeniem, ponieważ nie działa workQueue.offer (polecenie) i addWorker (polecenie, fałsz), co prowadzi do wyjątku:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Myślę, że aby rozwiązać ten problem, powinieneś ustawić pojemność kolejki na maksymalną liczbę poleceń, które chcesz wykonać.

Thomas Krieger
źródło
Poprawny. Byłem w stanie wykonać repro, kopiując kod do własnej klasy i dodając rejestratory. Zasadniczo, gdy kolejka jest pełna, a ja przesyłam nowe zadanie, spróbuje utworzyć nowego pracownika. Tymczasem jeśli w tym momencie mój resizer wywoła również setCorePoolSize na 2, to również tworzy nowego Workera. W tym momencie dwóch Robotników konkuruje o dodanie, ale obaj nie mogą, ponieważ naruszałoby to ograniczenie maksymalnej puli, więc nowe zadanie zostanie odrzucone. Myślę, że to warunek wyścigu i zgłosiłem raport o błędzie do OpenJDK. Zobaczmy. Ale odpowiedziałeś na moje pytanie, więc dostałeś nagrodę. Dziękuję Ci.
Swaranga Sarma
2

Nie jestem pewien, czy to kwalifikuje się jako błąd. Takie zachowanie powstaje, gdy dodatkowe wątki robocze są tworzone po zapełnieniu kolejki, ale w dokumentacji java zauważono, że osoba dzwoniąca musi poradzić sobie z zadaniami odrzuconymi.

Dokumenty Java

Fabryka nowych wątków. Wszystkie wątki są tworzone przy użyciu tej fabryki (za pomocą metody addWorker). Wszyscy dzwoniący muszą być przygotowani na awarię addWorker, co może odzwierciedlać zasady systemowe lub zasady użytkownika ograniczające liczbę wątków. Mimo że nie jest to traktowane jako błąd, niepowodzenie tworzenia wątków może spowodować odrzucenie nowych zadań lub zablokowanie istniejących zadań w kolejce.

Gdy zmieniasz rozmiar puli podstawowej, powiedzmy, że wzrost, tworzone są dodatkowe procesy robocze ( addWorkermetoda w setCorePoolSize), a wywołanie tworzenia dodatkowej pracy ( addWorkermetoda z execute) jest odrzucane, gdy addWorkerzwraca false ( add Workerostatni fragment kodu), ponieważ wystarczająca liczba dodatkowych pracowników jest już utworzony przez, setCorePoolSize ale jeszcze nie uruchomiony, aby odzwierciedlić aktualizację w kolejce .

Odpowiednie części

Porównać

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Użyj niestandardowej procedury obsługi odrzucania ponownych prób (powinno to działać w twoim przypadku, ponieważ masz górną granicę jako maksymalny rozmiar puli). Dostosuj w razie potrzeby.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Pamiętaj również, że użycie zamykania jest nieprawidłowe, ponieważ nie będzie czekać na przesłanie zadania do wykonania, ale awaitTerminationzamiast tego użyj .

Sagar Veeram
źródło
Myślę, że zamknięcie czeka na już przesłane zadania, zgodnie z JavaDoc: shutdown () Inicjuje uporządkowane zamknięcie, w którym wykonywane są poprzednio przesłane zadania, ale żadne nowe zadania nie zostaną zaakceptowane.
Thomas Krieger
@ThomasKrieger - wykona już przesłane zadania, ale nie będzie czekać na ich zakończenie - z docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Ta metoda nie czeka na uprzednio przesłane zadania do wykonania. Aby to zrobić, użyj funkcji awaitTermination.
Sagar Veeram