Java: ExecutorService, który blokuje przesyłanie po określonym rozmiarze kolejki

85

Próbuję zakodować rozwiązanie, w którym pojedynczy wątek wytwarza zadania intensywnie korzystające z operacji we / wy, które można wykonywać równolegle. Każde zadanie ma znaczące dane w pamięci. Dlatego chcę mieć możliwość ograniczenia liczby zadań oczekujących w danej chwili.

Jeśli utworzę ThreadPoolExecutor w ten sposób:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Potem executor.submit(callable)rzucaRejectedExecutionException gdy kolejka się zapełni i wszystkie wątki są już zajęte.

Co mogę zrobić executor.submit(callable) blok, gdy kolejka jest pełna i wszystkie wątki są zajęte?

EDYCJA : próbowałem tego :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

I w pewnym stopniu osiąga efekt, który chcę osiągnąć, ale w nieelegancki sposób (w zasadzie wątki odrzucone są uruchamiane w wątku wywołującym, więc blokuje to wątek wywołujący przed przesłaniem większej liczby).

EDYCJA: (5 lat po zadaniu pytania)

Każdemu, kto czyta to pytanie i odpowiedzi na nie, nie traktuj zaakceptowanej odpowiedzi jako jednego prawidłowego rozwiązania. Przeczytaj wszystkie odpowiedzi i komentarze.

Tahir Akhtar
źródło
1
Używałem już wcześniej semafora, aby to zrobić, tak jak w odpowiedzi na bardzo podobne pytanie, do którego link @axtavt prowadzi.
Stephen Denne
2
@TomWolk Po pierwsze, otrzymujesz jedno zadanie więcej wykonywane równolegle niż numWorkerThreadswtedy, gdy wątek wywołujący również wykonuje zadanie. Ale ważniejszą kwestią jest to, że jeśli wątek wywołujący otrzyma długotrwałe zadanie, inne wątki mogą stać bezczynne, czekając na następne zadanie.
Tahir Akhtar
2
@TahirAkhtar, prawda; kolejka powinna być wystarczająco długa, aby nie wysychała, gdy dzwoniący musi sam wykonać zadanie. Ale myślę, że jest to zaleta, jeśli jeszcze jeden wątek, wątek wywołujący, może być używany do wykonywania zadań. Jeśli wywołujący po prostu blokuje, wątek wywołującego byłby bezczynny. Używam CallerRunsPolicy z kolejką trzykrotnie większą niż pojemność puli wątków i działa ładnie i płynnie. W porównaniu z tym rozwiązaniem rozważyłbym hartowanie z nadmierną inżynierią ramową.
TomWolk
1
@TomWalk +1 Dobre punkty. Wydaje się, że kolejną różnicą jest to, że jeśli zadanie zostało odrzucone z kolejki i zostało uruchomione przez wątek wywołujący, wówczas wątek wywołujący zacząłby przetwarzać żądanie poza kolejnością, ponieważ nie czekał na swoją kolej w kolejce. Oczywiście, jeśli już zdecydowałeś się używać wątków, musisz poprawnie obsługiwać wszelkie zależności, ale tylko o czym należy pamiętać.
rimsky

Odpowiedzi:

64

Zrobiłem to samo. Sztuczka polega na utworzeniu BlockingQueue, w którym metoda offer () jest tak naprawdę metodą put (). (możesz użyć dowolnej bazy BlockingQueue, którą chcesz).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Zauważ, że działa to tylko dla puli wątków, gdzie corePoolSize==maxPoolSize więc bądź ostrożny (patrz komentarze).

jtahlborn
źródło
2
alternatywnie możesz rozszerzyć SynchronousQueue, aby zapobiec buforowaniu, zezwalając tylko na bezpośrednie przekazywanie.
brendon
Elegancki i bezpośrednio rozwiązujący problem. oferta () staje się put (), a put () oznacza „... czekanie, jeśli to konieczne, na zwolnienie miejsca”
Trenton
5
Nie sądzę, żeby to był dobry pomysł, ponieważ zmienia protokół metody oferty. Metoda oferty powinna być wywołaniem nieblokującym.
Mingjiang Shi
6
Nie zgadzam się - zmienia to zachowanie ThreadPoolExecutor.execute w taki sposób, że jeśli masz corePoolSize <maxPoolSize, logika ThreadPoolExecutor nigdy nie doda dodatkowych pracowników poza rdzeniem.
Krease
5
Aby wyjaśnić - Twoje rozwiązanie działa tylko tak długo, jak długo utrzymujesz ograniczenie w jakim miejscu corePoolSize==maxPoolSize. Bez tego ThreadPoolExecutor nie ma już zaprojektowanego zachowania. Szukałem rozwiązania tego problemu, które nie miało tego ograniczenia; zobacz moją alternatywną odpowiedź poniżej dotyczącą podejścia, które ostatecznie przyjęliśmy.
Krease
15

Oto, jak rozwiązałem ten problem:

(uwaga: to rozwiązanie blokuje wątek, który przesyła Callable, więc zapobiega wyrzucaniu wyjątku RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
cvacca
źródło
1
Zakładam, że to nie działa dobrze w przypadkach, w których corePoolSize < maxPoolSize...: |
rogerdpack
1
Działa w przypadku, gdy corePoolSize < maxPoolSize. W takich przypadkach semafor będzie dostępny, ale nie będzie wątku, a funkcja SynchronousQueuezwróci wartość false. ThreadPoolExecutorWtedy kręcić nowy wątek. Problem z tym rozwiązaniem polega na tym, że ma stan wyścigu . Po semaphore.release(), ale przed zakończeniem wątku execute, submit () otrzyma zezwolenie na semafor. Jeśli super.submit () zostanie uruchomiona przed execute()zakończeniem, zadanie zostanie odrzucone.
Luís Guilherme
@ LuísGuilherme Ale semaphore.release () nigdy nie zostanie wywołana przed zakończeniem wykonywania wątku. Ponieważ to wywołanie jest wykonywane w metodzie after Execute (...). Czy brakuje mi czegoś w scenariuszu, który opisujesz?
cvacca
1
Funkcja afterExecute jest wywoływana przez ten sam wątek, który uruchamia zadanie, więc nie została jeszcze zakończona. Zrób test sam. Zaimplementuj to rozwiązanie i rzuć ogromne ilości pracy na wykonawcę, wyrzucając, jeśli praca zostanie odrzucona. Zauważysz, że tak, jest to sytuacja wyścigowa i nie jest trudno ją odtworzyć.
Luís Guilherme
1
Przejdź do ThreadPoolExecutor i sprawdź metodę runWorker (Worker w). Zobaczysz, że rzeczy dzieją się po zakończeniu Execute, w tym odblokowanie pracownika i zwiększenie liczby ukończonych zadań. Pozwoliłeś więc wejść zadaniom (zwalniając semafor) bez posiadania pasma do ich przetwarzania (wywołując processWorkerExit).
Luís Guilherme
14

Aktualnie akceptowana odpowiedź ma potencjalnie istotny problem - zmienia zachowanie ThreadPoolExecutor. Wykonaj tak, że jeśli masz corePoolSize < maxPoolSize , logika ThreadPoolExecutor nigdy nie doda dodatkowych pracowników poza rdzeniem.

Z ThreadPoolExecutor .execute (Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

W szczególności ten ostatni blok „else” nigdy nie zostanie trafiony.

Lepszą alternatywą jest zrobienie czegoś podobnego do tego, co już robi OP - użyj RejectedExecutionHandler, aby wykonać tę samą putlogikę:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

Jest kilka rzeczy, na które należy zwrócić uwagę w przypadku tego podejścia, jak wskazano w komentarzach (odnosząc się do tej odpowiedzi ):

  1. Jeśli corePoolSize==0, to występuje sytuacja wyścigu, w której wszystkie wątki w puli mogą umrzeć, zanim zadanie będzie widoczne
  2. Użycie implementacji, która zawija zadania kolejki (nie dotyczy ThreadPoolExecutor) spowoduje problemy, chyba że program obsługi również opakuje je w ten sam sposób.

Mając na uwadze te problemy, to rozwiązanie będzie działać dla większości typowych ThreadPoolExecutors i będzie poprawnie obsługiwać przypadek, w którym corePoolSize < maxPoolSize.

Krease
źródło
Komu, kto został odrzucony - czy możesz podać jakieś informacje? Czy w tej odpowiedzi jest coś nieprawidłowego / mylącego / niebezpiecznego? Chciałbym mieć możliwość przedstawienia twoich obaw.
Krease
2
Nie głosowałem przeciw, ale wydaje się, że to bardzo zły pomysł
vanOekel
@vanOekel - dzięki za link - ta odpowiedź podnosi kilka ważnych przypadków, które powinny być znane przy stosowaniu tego podejścia, ale IMO nie czyni z tego „bardzo złego pomysłu” - nadal rozwiązuje problem obecny w obecnie akceptowanej odpowiedzi. Zaktualizowałem moją odpowiedź o te zastrzeżenia.
Krease
Jeśli rozmiar puli rdzenia wynosi 0 i jeśli zadanie zostanie przesłane do modułu wykonawczego, moduł wykonawczy rozpocznie tworzenie wątku / wątków, jeśli kolejka jest pełna, aby obsłużyć zadanie. Więc dlaczego jest podatny na impas. Nie rozumiem. Czy mógłbyś to rozwinąć.?
Farhan Shirgill Ansari
@ShirgillFarhanAnsari - to sprawa poruszona w poprzednim komentarzu. Może się tak zdarzyć, ponieważ dodanie bezpośrednio do kolejki nie powoduje tworzenia wątków / uruchamiania pracowników. Jest to
skrajny
4

Wiem, że to stare pytanie, ale miałem podobny problem, że tworzenie nowych zadań było bardzo szybkie i jeśli wystąpiło zbyt wiele OutOfMemoryError, ponieważ istniejące zadanie nie zostało ukończone wystarczająco szybko.

W moim przypadku Callablessą przesłane i potrzebuję wyniku, dlatego muszę przechowywać wszystkie Futureszwrócone przez executor.submit(). Moim rozwiązaniem było umieszczenie Futuresw BlockingQueuemaksymalnym rozmiarze. Po zapełnieniu kolejki zadania nie są generowane, dopóki niektóre nie zostaną zakończone (elementy usunięte z kolejki). W pseudokodzie:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(compoundFuture);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future compoundFuture = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
początkujący_
źródło
2

Miałem podobny problem i zaimplementowałem to używając beforeExecute/afterExecutehooków z ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

To powinno ci wystarczyć. Przy okazji, oryginalna implementacja była oparta na rozmiarze zadania, ponieważ jedno zadanie mogło być większe 100 razy niż inne, a przesłanie dwóch ogromnych zadań zabijało pudełko, ale uruchomienie jednego dużego i wielu małych było OK. Jeśli Twoje zadania intensywnie korzystające z operacji we / wy mają mniej więcej ten sam rozmiar, możesz użyć tej klasy, w przeciwnym razie po prostu daj mi znać, a opublikuję implementację opartą na rozmiarze.

PS Chciałbyś sprawdzić ThreadPoolExecutorjavadoc. To naprawdę fajny podręcznik użytkownika autorstwa Douga Lea o tym, jak można go łatwo dostosować.

Petro Semeniuk
źródło
1
Zastanawiam się, co się stanie, gdy Thread zatrzyma blokadę przedExecute () i zobaczy to maxTaskCount < currentTaskCounti zacznie czekać pod unpausedwarunkiem. W tym samym czasie inny wątek próbuje uzyskać blokadę w afterExecute (), aby zasygnalizować zakończenie zadania. Czy nie będzie to impas?
Tahir Akhtar
1
Zauważyłem też, że to rozwiązanie nie blokuje wątku, który wysyła zadania, gdy kolejka się zapełni. Więc RejectedExecutionExceptionjest to nadal możliwe.
Tahir Akhtar
1
Semantyka klas ReentrantLock / Condition jest podobna do tego, co zapewnia synchronizacja i czekanie / powiadamianie. Gdy metody oczekiwania na warunek zostaną wywołane, blokada zostanie zwolniona, więc nie będzie zakleszczenia.
Petro Semeniuk
Tak, ten ExecutorService blokuje zadania podczas przesyłania bez blokowania wątku wywołującego. Zadanie właśnie zostało przesłane i zostanie przetworzone asynchronicznie, gdy będzie wystarczająca ilość zasobów systemowych.
Petro Semeniuk
2

Zaimplementowałem rozwiązanie zgodne ze wzorcem dekoratora i wykorzystujące semafor do kontroli ilości wykonywanych zadań. Możesz go używać z dowolnym Executori:

  • Określ maksymalną liczbę trwających zadań
  • Określ maksymalny limit czasu oczekiwania na zezwolenie na wykonanie zadania (jeśli upłynie limit czasu i nie uzyskasz pozwolenia, RejectedExecutionExceptionzostanie wyrzucony a)
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
Grzegorz Lehmann
źródło
1

Myślę, że jest to tak proste, jak użycie a ArrayBlockingQueuezamiast aa LinkedBlockingQueue.

Zignoruj ​​mnie ... to całkowicie błędne. ThreadPoolExecutorpołączenia Queue#offernie takie, putktóre dałyby oczekiwany efekt.

Możesz rozszerzyć ThreadPoolExecutori zapewnić implementację execute(Runnable)tych wywołań putzamiast offer.

Obawiam się, że to nie jest w pełni satysfakcjonująca odpowiedź.

Gareth Davis
źródło