Poszukuję zgodnego sposobu konfigurowania korzystania z puli wątków lub nie. W idealnym przypadku nie powinno to mieć żadnego wpływu na resztę kodu. Mógłbym użyć puli wątków z 1 wątkiem, ale nie jest to do końca to, czego chcę. Jakieś pomysły?
ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);
// es.execute / es.submit / new ExecutorCompletionService(es) etc
java
concurrency
Michael Rutherfurd
źródło
źródło
AbstractExecutorService
wygląda na to, że należy iść.Runnable::run
Możesz użyć guawy
MoreExecutors.newDirectExecutorService()
lubMoreExecutors.directExecutor()
jeśli nie potrzebujeszExecutorService
.Jeśli dołączenie guawy jest zbyt ciężkie, możesz zaimplementować coś prawie tak dobrego:
public final class SameThreadExecutorService extends ThreadPoolExecutor { private final CountDownLatch signal = new CountDownLatch(1); private SameThreadExecutorService() { super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void shutdown() { super.shutdown(); signal.countDown(); } public static ExecutorService getInstance() { return SingletonHolder.instance; } private static class SingletonHolder { static ExecutorService instance = createInstance(); } private static ExecutorService createInstance() { final SameThreadExecutorService instance = new SameThreadExecutorService(); // The executor has one worker thread. Give it a Runnable that waits // until the executor service is shut down. // All other submitted tasks will use the RejectedExecutionHandler // which runs tasks using the caller's thread. instance.submit(new Runnable() { @Override public void run() { boolean interrupted = false; try { while (true) { try { instance.signal.await(); break; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }}); return Executors.unconfigurableScheduledExecutorService(instance); } }
źródło
Styl Java 8:
Executor e = Runnable::run;
źródło
Napisałem na
ExecutorService
podstawieAbstractExecutorService
./** * Executes all submitted tasks directly in the same thread as the caller. */ public class SameThreadExecutorService extends AbstractExecutorService { //volatile because can be viewed by other threads private volatile boolean terminated; @Override public void shutdown() { terminated = true; } @Override public boolean isShutdown() { return terminated; } @Override public boolean isTerminated() { return terminated; } @Override public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException { shutdown(); // TODO ok to call shutdown? what if the client never called shutdown??? return terminated; } @Override public List<Runnable> shutdownNow() { return Collections.emptyList(); } @Override public void execute(Runnable theCommand) { theCommand.run(); } }
źródło
terminated
nie będzie korzystać z dostępu zsynchronizowanego w oparciu o kod, który jest tutaj. Operacje na polach 32-bitowych są w Javie niepodzielne.Możesz użyć RejectedExecutionHandler, aby uruchomić zadanie w bieżącym wątku.
public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); } });
Potrzebujesz tylko jednego z nich.
źródło
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
.Musiałem użyć tego samego "CurrentThreadExecutorService" do celów testowych i chociaż wszystkie sugerowane rozwiązania były fajne (szczególnie ta, w której wspomina się o guawie ), wymyśliłem coś podobnego do tego, co zasugerował tutaj Peter Lawrey .
Jak wspomniano przez Axelle Ziegler tutaj , niestety rozwiązanie Piotra nie będą faktycznie pracować z powodu kontroli wprowadzonego
ThreadPoolExecutor
namaximumPoolSize
parametrze konstruktora (czylimaximumPoolSize
nie może być<=0
).Aby to obejść, wykonałem następujące czynności:
private static ExecutorService currentThreadExecutorService() { CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) { @Override public void execute(Runnable command) { callerRunsPolicy.rejectedExecution(command, this); } }; }
źródło