Czy istnieje usługa ExecutorService, która używa bieżącego wątku?

94

Poszukuję zgodnego sposobu konfigurowania korzystania z puli wątków lub nie. W idealnym przypadku nie powinno to mieć żadnego wpływu na resztę kodu. Mógłbym użyć puli wątków z 1 wątkiem, ale nie jest to do końca to, czego chcę. Jakieś pomysły?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Michael Rutherfurd
źródło

Odpowiedzi:

70

Oto naprawdę prosta Executor(nie ExecutorService, pamiętajcie) implementacja, która używa tylko bieżącego wątku. Kradzież z „Java Concurrency in Practice” (lektura niezbędna).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService jest bardziej rozbudowanym interfejsem, ale można go obsługiwać w ten sam sposób.

przemyślenie
źródło
4
+1: Jak mówisz, ExecutorService może być obsługiwany w ten sam sposób, być może przez podklasę AbstractExecutorService.
Paul Cager,
@Paul Tak, AbstractExecutorServicewygląda na to, że należy iść.
przemyślenie
15
W Javie8 możesz to zredukować do samegoRunnable::run
Jon Freedman
@Juude zawsze będzie działać w wątku, który wywołuje moduł wykonawczy.
Gustav Karlsson
Czy nie chodzi o wykonanie tego samego wątku, aby móc zaplanować więcej zadań z poziomu metody execute ()? Ta odpowiedź nie wystarczy. Nie mogę znaleźć odpowiedzi, która by to satysfakcjonowała.
haelix
82

Możesz użyć guawy MoreExecutors.newDirectExecutorService()lub MoreExecutors.directExecutor()jeśli nie potrzebujesz ExecutorService.

Jeśli dołączenie guawy jest zbyt ciężkie, możesz zaimplementować coś prawie tak dobrego:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
NamshubWriter
źródło
1
W przypadku systemu Android zwraca Executors.unconfigurableExecutorService (instancja);
Maragues
jeśli wszystko, czego używamy, to bieżący wątek , po co prymitywy synchronizacji? dlaczego zatrzask?
haelix
@haelix zatrzask jest potrzebny, ponieważ nawet jeśli praca jest wykonywana w tym samym wątku, co ten, który dodał pracę, każdy wątek może zamknąć executor.
NamshubWriter
64

Styl Java 8:

Executor e = Runnable::run;

lpandzic
źródło
8
Absolutnie brudne. Kocham to.
Rogue
Co w tym brzydkiego? To eleganckie :)
lpandzic
1
To najlepszy rodzaj brudnego @Ipandzic, jest niezwykły i zwięzły.
Rogue
12

Napisałem na ExecutorServicepodstawie AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Eric Obermühlner
źródło
zakończone pole nie jest chronione z synchronizacją.
Daneel Yaitskov
1
Pole @ DaneelS.Yaitskov terminatednie będzie korzystać z dostępu zsynchronizowanego w oparciu o kod, który jest tutaj. Operacje na polach 32-bitowych są w Javie niepodzielne.
Christopher Schultz
Przypuszczam, że metoda isTerminated () w powyższym nie jest do końca poprawna, ponieważ isTerminated () ma zwracać wartość true tylko wtedy, gdy nie ma aktualnie wykonywanych zadań. Guawa śledzi liczbę zadań w innej zmiennej, prawdopodobnie dlatego chronią obie zmienne za pomocą blokady.
Jeremy K
7

Możesz użyć RejectedExecutionHandler, aby uruchomić zadanie w bieżącym wątku.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Potrzebujesz tylko jednego z nich.

Peter Lawrey
źródło
Sprytny! Jak bezpieczne jest to (szczere pytanie)? Czy istnieją sposoby na odrzucenie zadania, w którym faktycznie nie chciałbyś go wykonać w bieżącym wątku? Czy zadania są odrzucane, jeśli ExecutorService jest zamykany lub przerywany?
przemyślenie
Ponieważ maksymalny rozmiar to 0, każde zadanie jest odrzucane. Jednak odrzucone zachowanie ma działać w bieżącym wątku. Problem byłby tylko wtedy, gdyby zadanie NIE zostało odrzucone.
Peter Lawrey,
8
uwaga, istnieje już implementacja tej polityki, nie ma potrzeby definiowania własnej java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn
7
Nie jest już możliwe utworzenie ThreadPoolExecutor z maksymalnym rozmiarem puli 0. Sądzę, że byłoby możliwe odtworzenie zachowania przy użyciu blockingQueue o rozmiarze 0, ale żadna domyślna implementacja nie wydaje się na to zezwalać.
Axelle Ziegler
to nie skompiluje się z powodu {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} w java.util.ThreadPoolExecutor (przynajmniej openJdk 7)
Bogdan
7

Musiałem użyć tego samego "CurrentThreadExecutorService" do celów testowych i chociaż wszystkie sugerowane rozwiązania były fajne (szczególnie ta, w której wspomina się o guawie ), wymyśliłem coś podobnego do tego, co zasugerował tutaj Peter Lawrey .

Jak wspomniano przez Axelle Ziegler tutaj , niestety rozwiązanie Piotra nie będą faktycznie pracować z powodu kontroli wprowadzonego ThreadPoolExecutorna maximumPoolSizeparametrze konstruktora (czyli maximumPoolSizenie może być <=0).

Aby to obejść, wykonałem następujące czynności:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
fabriziocucci
źródło