Jak sprawić, by metoda przesyłania () ThreadPoolExecutor była blokowana, jeśli jest nasycona?

102

Chcę stworzyć ThreadPoolExecutortaki, który po osiągnięciu maksymalnego rozmiaru i zapełnieniu kolejki submit()metoda blokuje się przy próbie dodania nowych zadań. Czy muszę zaimplementować w RejectedExecutionHandlertym celu niestandardowy sposób , czy też istnieje istniejący sposób, aby to zrobić przy użyciu standardowej biblioteki Java?

Punkt stały
źródło
2
Czy chcesz czegoś takiego jak metoda offer () kolejki blokowania Array?
extraneon
2
@bacar Nie zgadzam się. Te pytania i odpowiedzi wyglądają na bardziej wartościowe (oprócz tego, że są starsze).
JasonMArcher,

Odpowiedzi:

47

Jedno z możliwych rozwiązań, które właśnie znalazłem:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Czy są jakieś inne rozwiązania? Wolałbym coś opartego na czymś, RejectedExecutionHandlerponieważ wydaje się, że to standardowy sposób radzenia sobie w takich sytuacjach.

Punkt stały
źródło
2
Czy istnieje tu warunek wyścigu między momentem zwolnienia semafora w klauzuli końcowej a nabyciem semafora?
volni
2
Jak wspomniano powyżej, ta implementacja jest wadliwa, ponieważ semafor jest zwalniany przed zakończeniem zadania. Lepiej byłoby użyć metody java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable)
FelixM
2
@FelixM: użycie java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable) nie rozwiąże problemu, ponieważ afterExecute jest wywoływana bezpośrednio po task.run () w java.util.concurrent.ThreadPoolExecutor # runWorker (Worker w), przed pobranie kolejnego elementu z kolejki (spojrzenie na kod źródłowy openjdk 1.7.0.6).
Jaan,
1
Ta odpowiedź pochodzi z książki Java Concurrency in Practice autorstwa Briana Goetza
orangepips.
11
Ta odpowiedź nie jest do końca poprawna, są też komentarze. Ten fragment kodu rzeczywiście pochodzi z Java Concurrency in Practice i jest poprawny, jeśli weźmie się pod uwagę jego kontekst . Książka jasno stwierdza, dosłownie: „W takim podejściu użyj nieograniczonej kolejki (…) i ustaw ograniczenie semafora tak, aby było równe rozmiarowi puli plus liczbie zadań w kolejce, na które chcesz zezwolić”. Dzięki nieograniczonej kolejce zadania nigdy nie zostaną odrzucone, więc ponowne zgłoszenie wyjątku jest całkowicie bezużyteczne! Co, jak sądzę, jest również powodem, dla którego NIEthrow e; ma tego w książce. JCIP ma rację!
Timmos
30

Możesz użyć ThreadPoolExecutor i blockingQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
DiTime4Tea
źródło
Chciałbym tylko powiedzieć, że było to niesamowicie szybkie i łatwe do wdrożenia rozwiązanie, które działało bardzo dobrze!
Ivan
58
Spowoduje to uruchomienie odrzuconych zadań w wątku wysyłającym. Który funkcjonalnie nie spełnia wymagań PO.
Percepcja
4
Powoduje to uruchomienie zadania „w wątku wywołującym” zamiast blokowania umieszczenia go w kolejce, co może mieć pewne niekorzystne skutki, na przykład jeśli wiele wątków nazywa to w ten sposób, będzie wykonywanych więcej zadań niż „rozmiar kolejki”, a jeśli zdarza się, że zadanie trwa dłużej niż oczekiwano, twój wątek „produkujący” może nie zajmować wykonywanego zadania. Ale świetnie się tutaj spisało!
rogerdpack
4
Uznano w dół: nie blokuje, gdy TPE jest nasycony. To tylko alternatywa, a nie rozwiązanie.
Timmos
1
Uznano za: to prawie pasuje do `` projektu TPE '' i `` naturalnie blokuje '' wątki klientów, dając im przepełniony gust do zrobienia. Powinno to obejmować większość przypadków użycia, ale oczywiście nie wszystkie. Powinieneś zrozumieć, co się dzieje pod maską.
Mike
12

Należy użyć CallerRunsPolicy, który wykonuje odrzucone zadanie w wątku wywołującym. W ten sposób nie może przesyłać żadnych nowych zadań do modułu wykonawczego, dopóki to zadanie nie zostanie wykonane, w którym to momencie będą wolne wątki puli lub proces się powtórzy.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Z dokumentów:

Odrzucone zadania

Nowe zadania przesłane w metodzie execute (java.lang.Runnable) zostaną odrzucone, gdy Executor zostanie zamknięty, a także gdy Executor używa skończonych granic zarówno dla maksymalnych wątków, jak i pojemności kolejki roboczej i jest nasycony. W obu przypadkach metoda execute wywołuje metodę RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) jej RejectedExecutionHandler. Dostępne są cztery wstępnie zdefiniowane zasady obsługi:

  1. W domyślnym ThreadPoolExecutor.AbortPolicy program obsługi zgłasza wyjątek RejectedExecutionException środowiska wykonawczego po odrzuceniu.
  2. W ThreadPoolExecutor.CallerRunsPolicy wątek, który wywołuje wykonanie, sam uruchamia zadanie. Zapewnia to prosty mechanizm kontroli sprzężenia zwrotnego, który spowolni tempo składania nowych zadań.
  3. W ThreadPoolExecutor.DiscardPolicy zadanie, którego nie można wykonać, jest po prostu odrzucane.
  4. W ThreadPoolExecutor.DiscardOldestPolicy, jeśli moduł wykonawczy nie zostanie zamknięty, zadanie na początku kolejki roboczej jest odrzucane, a następnie ponawiane jest wykonanie (co może ponownie zakończyć się niepowodzeniem, powodując powtórzenie).

Upewnij się również, że podczas wywoływania ThreadPoolExecutorkonstruktora używasz kolejki ograniczonej, takiej jak ArrayBlockingQueue . W przeciwnym razie nic nie zostanie odrzucone.

Edycja: w odpowiedzi na swój komentarz ustaw rozmiar ArrayBlockingQueue na równy maksymalnemu rozmiarowi puli wątków i użyj AbortPolicy.

Edycja 2: OK, rozumiem, do czego zmierzasz. A co z tym: zastąp beforeExecute()metodę, aby sprawdzić, getActiveCount()czy nie przekracza getMaximumPoolSize(), a jeśli tak, prześpij się i spróbuj ponownie?

danben
źródło
3
Chcę, aby liczba współbieżnie wykonywanych zadań była ściśle ograniczona (przez liczbę wątków w Executorze), dlatego nie mogę pozwolić wątkom wywołującym na samodzielne wykonywanie tych zadań.
Punkt stały
1
AbortPolicy spowoduje, że wątek wywołujący otrzyma wyjątek RejectedExecutionException, podczas gdy potrzebuję go po prostu zablokować.
Punkt stały
2
Proszę o blokowanie, a nie usypianie i odpytywanie;)
Fixpoint
@danben: Czy nie masz na myśli CallerRunsPolicy ?
user359996
7
Problem z CallerRunPolicy polega na tym, że jeśli masz jednego producenta wątku, często będziesz mieć wątki nieużywane, jeśli długo działające zadanie zostanie odrzucone (ponieważ inne zadania w puli wątków zostaną zakończone, gdy długo działające zadanie jest nadal bieganie).
Adam Gent
6

Hibernate ma BlockPolicyprostą i może robić, co chcesz:

Zobacz: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Nate Murray
źródło
4
Po namyśle to całkiem zły pomysł. Nie polecam jej używać. Z dobrych powodów zobacz tutaj: stackoverflow.com/questions/3446011/ ...
Nate Murray
Ponadto nie jest to użycie „standardowej biblioteki Java”, zgodnie z żądaniem OP. Usunąć?
user359996
1
Woah, to takie brzydkie. Zasadniczo to rozwiązanie koliduje z wewnętrznymi elementami TPE. Javadoc for ThreadPoolExecutoreven mówi dosłownie: "Metoda getQueue () umożliwia dostęp do kolejki roboczej w celu monitorowania i debugowania. Używanie tej metody do jakichkolwiek innych celów jest zdecydowanie odradzane.". To, że jest to dostępne w tak szeroko znanej bibliotece, jest absolutnie smutne.
Timmos
1
Podobnie jest z com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy.
Adrian Baker
6

BoundedExecutorOdpowiedź cytowany powyżej z Java współbieżności w praktyce działa poprawnie tylko w przypadku korzystania z kolejki nieograniczony dla Wykonawcy lub semafor związany jest nie większy niż rozmiar kolejki. Semafor jest stanem współdzielonym między wątkiem wysyłającym i wątkami w puli, dzięki czemu możliwe jest nasycenie modułu wykonawczego, nawet jeśli rozmiar kolejki <bound <= (rozmiar kolejki + rozmiar puli).

Używanie CallerRunsPolicyjest ważne tylko wtedy, gdy twoje zadania nie działają wiecznie, w takim przypadku twój wątek wysyłający pozostanie na rejectedExecutionzawsze, a zły pomysł, jeśli twoje zadania będą działać długo, ponieważ wątek wysyłający nie może przesłać żadnych nowych zadań lub zrobić cokolwiek innego, jeśli sam uruchamia zadanie.

Jeśli to nie do przyjęcia, sugeruję sprawdzenie rozmiaru ograniczonej kolejki executora przed przesłaniem zadania. Jeśli kolejka jest pełna, odczekaj chwilę, zanim ponownie spróbujesz przesłać. Przepustowość ucierpi, ale sugeruję, że jest to prostsze rozwiązanie niż wiele innych proponowanych rozwiązań i masz gwarancję, że żadne zadanie nie zostanie odrzucone.

stephan f
źródło
Nie jestem pewien, jak sprawdzenie długości kolejki przed przesłaniem gwarantuje brak odrzuconych zadań w środowisku wielowątkowym z wieloma producentami zadań. To nie brzmi bezpiecznie.
Tim
5

Wiem, to hack, ale moim zdaniem najczystszy hack spośród oferowanych tutaj ;-)

Ponieważ ThreadPoolExecutor używa "oferta" kolejki blokującej zamiast "umieścić", pozwala na nadpisanie zachowania "oferty" kolejki blokującej:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

Przetestowałem to i wydaje się, że działa. Wdrożenie pewnej polityki limitu czasu jest zadaniem czytelnika.

Kuba
źródło
Zobacz stackoverflow.com/a/4522411/2601671, aby uzyskać wyczyszczoną wersję tego. Zgadzam się, to najczystszy sposób na zrobienie tego.
Trenton
3

Następująca klasa otacza ThreadPoolExecutor i używa Semaphore do blokowania, a następnie kolejka robocza jest pełna:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

Ta klasa opakowania jest oparta na rozwiązaniu przedstawionym w książce Java Concurrency in Practice autorstwa Briana Goetza. Rozwiązanie opisane w książce przyjmuje tylko dwa parametry konstruktora: an Executori ograniczenie używane dla semafora. Pokazuje to odpowiedź udzielona przez Fixpoint. Jest problem z tym podejściem: może dojść do stanu, w którym wątki puli są zajęte, kolejka jest pełna, ale semafor właśnie wydał zezwolenie. ( semaphore.release()w ostatnim bloku). W tym stanie nowe zadanie może pobrać właśnie wydane zezwolenie, ale jest odrzucane, ponieważ kolejka zadań jest pełna. Oczywiście nie jest to coś, czego chcesz; chcesz zablokować w tym przypadku.

Aby rozwiązać ten problem, musimy użyć nieograniczonej kolejki, o czym wyraźnie wspomina JCiP. Semafor działa jako strażnik, dając efekt wirtualnego rozmiaru kolejki. Ma to taki efekt uboczny, że jest możliwe, że jednostka może zawierać maxPoolSize + virtualQueueSize + maxPoolSizezadania. Dlaczego? Z powodu semaphore.release()w ostatnim bloku. Jeśli wszystkie wątki puli wywołują tę instrukcję w tym samym czasie, maxPoolSizezezwolenia są zwalniane, pozwalając tej samej liczbie zadań na wejście do jednostki. Gdybyśmy używali ograniczonej kolejki, nadal byłaby pełna, co spowodowałoby odrzucenie zadania. Teraz, ponieważ wiemy, że dzieje się tak tylko wtedy, gdy wątek puli jest prawie gotowy, nie stanowi to problemu. Wiemy, że wątek puli nie będzie się blokował, więc zadanie wkrótce zostanie pobrane z kolejki.

Możesz jednak użyć ograniczonej kolejki. Tylko upewnij się, że jego rozmiar jest równy virtualQueueSize + maxPoolSize. Większe rozmiary są bezużyteczne, semafor uniemożliwi wpuszczenie większej liczby elementów. Mniejsze rozmiary spowodują odrzucenie zadań. Szansa na odrzucenie zadań rośnie wraz ze zmniejszaniem się rozmiaru. Na przykład, powiedzmy, że chcesz mieć ograniczony moduł wykonawczy z maxPoolSize = 2 i virtualQueueSize = 5. Następnie weź semafor z 5 + 2 = 7 zezwoleniami i rzeczywistym rozmiarem kolejki 5 + 2 = 7. Rzeczywista liczba zadań, które mogą znajdować się w jednostce, to 2 + 5 + 2 = 9. Kiedy moduł wykonawczy jest pełny (5 zadań w kolejce, 2 w puli wątków, więc 0 zezwoleń jest dostępnych) i WSZYSTKIE wątki puli zwalniają swoje zezwolenia, wtedy nadchodzące zadania mogą otrzymać dokładnie 2 zezwolenia.

Teraz rozwiązanie JCiP jest nieco kłopotliwe w użyciu, ponieważ nie wymusza wszystkich tych ograniczeń (nieograniczonej kolejki lub ograniczeń matematycznych itp.). Myślę, że jest to tylko dobry przykład pokazujący, jak można budować nowe klasy bezpiecznych wątków w oparciu o części, które są już dostępne, ale nie jako w pełni rozwiniętą klasę wielokrotnego użytku. Nie sądzę, żeby to drugie było intencją autora.

Timmos
źródło
2

możesz użyć niestandardowego RejectedExecutionHandler w ten sposób

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
Amir David Nissan Cohen
źródło
1
Dokumentacja getQueue () wyraźnie wspomina, że dostęp do kolejki zadań jest przeznaczony głównie do debugowania i monitorowania.
Chadi,
0

Utwórz własną kolejkę blokującą, która będzie używana przez moduł wykonawczy, z zachowaniem blokującym, którego szukasz, zawsze zwracając dostępną pozostałą pojemność (upewniając się, że moduł wykonawczy nie będzie próbował utworzyć więcej wątków niż jego pula rdzeniowa ani nie uruchomi modułu obsługi odrzucania).

Wierzę, że w ten sposób uzyskasz zachowanie blokujące, którego szukasz. Program obsługi odrzucania nigdy nie pasuje do rachunku, ponieważ oznacza to, że wykonawca nie może wykonać zadania. Mogę sobie wyobrazić, że osoba prowadząca jest zajęta czekaniem. To nie jest to, czego chcesz, potrzebujesz kolejki dla executora, który blokuje wywołującego ...

Fried Hoeben
źródło
2
ThreadPoolExecutorużywa offermetody dodawania zadań do kolejki. Gdybym utworzył niestandardowy BlockingQueueblokujący się offer, zerwałoby BlockingQueueto kontrakt.
Punkt stały
@Shooshpanchick, to złamałoby kontrakt BlockingQueues. Więc co? jeśli jesteś tak chętny, możesz jawnie włączyć zachowanie tylko podczas przesyłania () (zajmie to ThreadLocal)
bestsss
Zobacz także tę odpowiedź na inne pytanie, które określa tę alternatywę.
Robert Tupelo-Schneck
czy istnieje powód, dla którego ThreadPoolExecutorzostał wdrożony do użycia, offera nie put(wersja blokująca)? Ponadto, gdyby istniał sposób, aby kod klienta wskazywał, którego użyć i kiedy, wiele osób próbujących ręcznie wprowadzić niestandardowe rozwiązania byłoby odciążone
asgs
0

Aby uniknąć problemów z rozwiązaniem @FixPoint. Można by użyć ListeningExecutorService i zwolnić semafor onSuccess i onFailure wewnątrz FutureCallback.

vamsu
źródło
Ma to te same nieodłączne problemy, co samo pakowanie, Runnableponieważ te metody są nadal wywoływane przed normalnym czyszczeniem pracownika ThreadPoolExecutor. Oznacza to, że nadal będziesz musiał obsługiwać wyjątki odrzucenia.
Adam Gent,
0

Ostatnio stwierdziłem, że to pytanie ma ten sam problem. OP nie mówi tego wprost, ale nie chcemy używać tego, RejectedExecutionHandlerktóry wykonuje zadanie w wątku przesyłającego, ponieważ spowoduje to niedostateczne wykorzystanie wątków roboczych, jeśli to zadanie jest długotrwałe.

Czytając wszystkie odpowiedzi i komentarze, w szczególności wadliwe rozwiązanie z semaforem lub używając do tego semafora, afterExecuteprzyjrzałem się bliżej kodowi ThreadPoolExecutor, aby sprawdzić, czy jest jakieś wyjście. Byłem zdumiony, widząc ponad 2000 linii (skomentowanego) kodu, z których niektóre przyprawiają mnie o zawrót głowy . Biorąc pod uwagę raczej proste wymaganie, które mam - jeden producent, kilku konsumentów, niech producent blokuje, gdy żaden konsument nie może podjąć pracy - postanowiłem stworzyć własne rozwiązanie. To nie jest, ExecutorServiceale tylko plik Executor. I nie dostosowuje liczby wątków do obciążenia pracą, ale utrzymuje tylko stałą liczbę wątków, co również odpowiada moim wymaganiom. Oto kod. Zapraszam do opowiadania o tym :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
Harald
źródło
0

Uważam, że istnieje dość elegancki sposób rozwiązania tego problemu poprzez użycie java.util.concurrent.Semaphorei delegowanie zachowania Executor.newFixedThreadPool. Nowa usługa wykonawcza wykona nowe zadanie tylko wtedy, gdy istnieje do tego wątek. Blokowaniem zarządza Semaphore z liczbą zezwoleń równą liczbie wątków. Po zakończeniu zadania zwraca zezwolenie.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
Radosław Kachel
źródło
Zaimplementowałem BoundedExecutor opisaną w Java Concurrency in Practice i doszedłem do wniosku, że semafor musi zostać zainicjowany z flagą fairness ustawioną na true, aby zapewnić, że zezwolenia semafora są oferowane w żądaniach kolejności. Patrz docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . szczegóły
Prahalad Deshpande
0

W przeszłości miałem tę samą potrzebę: rodzaj kolejki blokującej o stałym rozmiarze dla każdego klienta wspieranej przez wspólną pulę wątków. Skończyło się na tym, że napisałem własny ThreadPoolExecutor:

UserThreadPoolExecutor (kolejka blokująca (na klienta) + pula wątków (wspólna dla wszystkich klientów))

Zobacz: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Każdy UserThreadPoolExecutor otrzymuje maksymalną liczbę wątków z udostępnionego ThreadPoolExecutor

Każdy UserThreadPoolExecutor może:

  • przesłać zadanie do modułu wykonawczego puli wątków udostępnionych, jeśli jego przydział nie zostanie osiągnięty. Jeśli limit zostanie osiągnięty, zadanie jest umieszczane w kolejce (niekonsumpcyjne blokowanie czeka na procesor). Po zakończeniu jednego z przesłanych zadań przydział jest zmniejszany, co pozwala na przesłanie kolejnego zadania oczekującego do ThreadPoolExecutor
  • poczekaj na zakończenie pozostałych zadań
d4rxh4wx
źródło
0

Znalazłem tę zasadę odrzucania w kliencie wyszukiwania elastycznego. Blokuje wątek wywołujący w kolejce blokującej. Kod poniżej

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
Vladislav
źródło
0

Niedawno musiałem osiągnąć coś podobnego, ale na ScheduledExecutorService.

Musiałem również upewnić się, że obsłużę opóźnienie przekazywane do metody i upewnij się, że zadanie zostanie przesłane do wykonania w czasie, którego oczekuje wywołujący, lub po prostu nie powiedzie się, rzucając w ten sposób plik RejectedExecutionException.

Inne metody z ScheduledThreadPoolExecutordo wykonywania lub wysyłania wewnętrznego wywołania zadania, #schedulektóre nadal będą wywoływać zastąpione metody.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

Mam tutaj kod, docenię wszelkie uwagi. https://github.com/AmitabhAwasthi/BlockingScheduler

Dev Amitabh
źródło
Ta odpowiedź opiera się całkowicie na treści linków zewnętrznych. Gdyby kiedykolwiek stały się nieważne, twoja odpowiedź byłaby bezużyteczna. Dlatego edytuj swoją odpowiedź i dodaj przynajmniej podsumowanie tego, co można tam znaleźć. Dziękuję Ci!
Fabio mówi Przywróć Monikę
@fabio: dzięki za wskazanie. Dodałem tam kod, aby teraz miał większy sens dla czytelników. Doceniam twój komentarz :)
Dev Amitabh
0

Nie zawsze podoba mi się CallerRunsPolicy, zwłaszcza, że ​​pozwala ona odrzuconym zadaniom na „pominięcie kolejki” i wykonanie ich przed zadaniami, które zostały przesłane wcześniej. Ponadto wykonanie zadania w wątku wywołującym może zająć znacznie więcej czasu niż oczekiwanie na udostępnienie pierwszego gniazda.

Rozwiązałem ten problem za pomocą niestandardowego RejectedExecutionHandler, który po prostu blokuje wątek wywołujący na chwilę, a następnie próbuje ponownie przesłać zadanie:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Tej klasy można po prostu użyć w programie wykonawczym puli wątków jako RejectedExecutinHandler, jak każda inna, na przykład:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

Jedynym minusem, jaki widzę, jest to, że wątek wywołujący może zostać zablokowany nieco dłużej niż jest to konieczne (do 250 ms). Co więcej, ponieważ ten wykonawca jest skutecznie wywoływany rekurencyjnie, bardzo długie oczekiwanie na udostępnienie wątku (godziny) może spowodować przepełnienie stosu.

Niemniej jednak osobiście podoba mi się ta metoda. Jest kompaktowy, łatwy do zrozumienia i dobrze działa.

TinkerTank
źródło
1
Jak sam mówisz: może to spowodować przepełnienie stosu. Nie jest to coś, co chciałbym mieć w kodzie produkcyjnym.
Harald
Każdy powinien podejmować własne decyzje. Nie stanowi to problemu dla mojego obciążenia pracą. Zadania są wykonywane w ciągu kilku sekund, a nie godzin, które byłyby potrzebne do wysadzenia stosu. Co więcej, to samo można powiedzieć o praktycznie każdym algorytmie rekurencyjnym. Czy to powód, aby nigdy nie używać algorytmu rekurencyjnego w produkcji?
TinkerTank