Czy można określić niestandardową pulę wątków dla równoległego strumienia Java 8 ? Nie mogę tego nigdzie znaleźć.
Wyobraź sobie, że mam aplikację serwera i chciałbym korzystać z równoległych strumieni. Ale aplikacja jest duża i wielowątkowa, więc chcę ją podzielić na części. Nie chcę wolno działającego zadania w jednym module zadań bloku aplikacji z innego modułu.
Jeśli nie mogę użyć różnych pul wątków dla różnych modułów, oznacza to, że nie mogę bezpiecznie używać równoległych strumieni w większości rzeczywistych sytuacji.
Wypróbuj następujący przykład. Niektóre zadania intensywnie wykorzystujące procesor są wykonywane w osobnych wątkach. Zadania wykorzystują równoległe strumienie. Pierwsze zadanie jest zepsute, więc każdy krok zajmuje 1 sekundę (symulowany przez uśpienie wątku). Problem polega na tym, że inne wątki blokują się i czekają na zakończenie przerwanego zadania. To wymyślony przykład, ale wyobraź sobie aplikację serwletu i kogoś, kto przesyła długo działające zadanie do wspólnej puli dołączania wideł.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Odpowiedzi:
W rzeczywistości istnieje pewna sztuczka, jak wykonać równoległą operację w konkretnej puli sprzężenia wideł. Jeśli wykonasz go jako zadanie w puli łączenia widelca, pozostanie tam i nie użyje wspólnej.
Trik jest oparty na ForkJoinTask.fork, który określa: „Organizuje asynchroniczne wykonanie tego zadania w puli, w której działa bieżące zadanie, jeśli ma zastosowanie, lub za pomocą ForkJoinPool.commonPool (), jeśli nie inForkJoinPool ()”
źródło
ForkJoinPool
lub czy jest to szczegół implementacji? Link do dokumentacji byłby miły.ForkJoinPool
instancja powinna byćshutdown()
wtedy, gdy nie jest już potrzebna, aby uniknąć wycieku nici. (przykład)Strumienie równoległe używają wartości domyślnej,
ForkJoinPool.commonPool
która domyślnie ma o jedenRuntime.getRuntime().availableProcessors()
wątek mniejszą liczbę procesorów , co jest zwracane przez (Oznacza to, że równoległe strumienie używają wszystkich procesorów, ponieważ również używają głównego wątku):Oznacza to również, że jeśli zagnieżdżono równoległe strumienie lub wiele równoległych strumieni uruchomionych jednocześnie, wszystkie one będą miały tę samą pulę. Zaleta: nigdy nie użyjesz więcej niż domyślna (liczba dostępnych procesorów). Wada: możesz nie przypisać „wszystkich procesorów” do każdego inicjowanego równoległego strumienia (jeśli akurat masz więcej niż jeden). (Najwyraźniej możesz użyć ManagedBlocker, aby to obejść).
Aby zmienić sposób wykonywania równoległych strumieni, możesz albo
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
lubSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
dla docelowej równoległości 20 wątków. Jednak to już nie działa po zaktualizowanej łatce https://bugs.openjdk.java.net/browse/JDK-8190974 .Przykład tego ostatniego na moim komputerze, który ma 8 procesorów. Jeśli uruchomię następujący program:
Dane wyjściowe to:
Widać więc, że strumień równoległy przetwarza 8 elementów jednocześnie, tzn. Używa 8 wątków. Jeśli jednak odkomentuję skomentowany wiersz, wynikiem jest:
Tym razem strumień równoległy wykorzystał 20 wątków, a wszystkie 20 elementów w strumieniu zostało przetworzonych jednocześnie.
źródło
commonPool
rzeczywistości ma jeden mniej niżavailableProcessors
, co powoduje całkowitą równoległość równą,availableProcessors
ponieważ wątek wywołujący liczy się jako jeden.ForkJoinTask
. Do naśladowaniaparallel()
get()
jest potrzebne:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
moje działania Stream będą uruchamiane z podanymForkJoinPool
. Oczekiwałbym, że cała akcja Stream jest wykonywana w ForJoinPool jako JEDNA akcja, ale wewnętrznie nadal korzysta z domyślnego / wspólnego ForkJoinPool. Gdzie widziałeś, że ForkJoinPool.submit () zrobiłby to, co mówisz, że działa?Alternatywnie do sztuczki polegającej na uruchamianiu obliczeń równoległych w swoim własnym forkJoinPool możesz również przekazać tę pulę do metody CompletableFuture.supplyAsync, jak w:
źródło
Oryginalne rozwiązanie (ustawienie wspólnej właściwości równoległości ForkJoinPool) już nie działa. Patrząc na łącza w oryginalnej odpowiedzi, aktualizacja, która je łamie, została ponownie przeniesiona do Javy 8. Jak wspomniano w połączonych wątkach, nie gwarantuje się, że to rozwiązanie będzie działać wiecznie. Na tej podstawie rozwiązaniem jest forkjoinpool.submit z rozwiązaniem .get omówionym w zaakceptowanej odpowiedzi. Myślę, że backport naprawia również niewiarygodność tego rozwiązania.
źródło
ForkJoinPool.commonPool().getParallelism()
w trybie debugowania.unreported exception InterruptedException; must be caught or declared to be thrown
nawet ze wszystkimicatch
wyjątkami w pętli.Możemy zmienić domyślną równoległość za pomocą następującej właściwości:
które można skonfigurować tak, aby korzystało z większej równoległości.
źródło
Aby zmierzyć rzeczywistą liczbę używanych wątków, możesz sprawdzić
Thread.activeCount()
:Może to wytworzyć na 4-rdzeniowym procesorze wyjście takie jak:
Bez
.parallel()
tego daje:źródło
Do tej pory korzystałem z rozwiązań opisanych w odpowiedziach na to pytanie. Teraz stworzyłem małą bibliotekę o nazwie Parallel Stream Support :
Ale jak zauważył @PabloMatiasGomez w komentarzach, istnieją mechanizmy podziału mechanizmu równoległych strumieni, które w dużym stopniu zależą od wielkości wspólnej puli. Zobacz Równoległy strumień z zestawu HashSet nie działa równolegle .
Korzystam z tego rozwiązania tylko, aby mieć osobne pule dla różnych rodzajów pracy, ale nie mogę ustawić wielkości wspólnej puli na 1, nawet jeśli go nie używam.
źródło
Uwaga: Wydaje się, że w JDK 10 wprowadzono poprawkę, która zapewnia, że niestandardowa pula wątków używa oczekiwanej liczby wątków.
Równoległe wykonywanie strumienia w niestandardowym ForkJoinPool powinno być zgodne z równoległością https://bugs.openjdk.java.net/browse/JDK-8190974
źródło
Próbowałem niestandardowego ForkJoinPool w następujący sposób, aby dostosować rozmiar puli:
Oto wynik mówiący, że pula używa więcej wątków niż domyślna 4 .
Ale tak naprawdę jest dziwne , kiedy próbowałem osiągnąć ten sam wynik, używając
ThreadPoolExecutor
następujących metod :ale zawiodłem.
Uruchamia równolegleStream w nowym wątku, a wtedy wszystko inne jest takie samo, co ponownie dowodzi, że
parallelStream
użyje ForkJoinPool do uruchomienia wątków potomnych.źródło
Idź po AbacusUtil . Numer wątku można określić dla strumienia równoległego. Oto przykładowy kod:
Ujawnienie : Jestem programistą AbacusUtil.
źródło
Jeśli nie chcesz polegać na hakach implementacyjnych, zawsze możesz to osiągnąć, wdrażając niestandardowe kolektory, które będą łączyć
map
icollect
semantykę ... i nie będziesz ograniczony do ForkJoinPool:Na szczęście jest to już zrobione tutaj i dostępne w Maven Central: http://github.com/pivovarit/parallel-collectors
Uwaga: Napisałem to i biorę za to odpowiedzialność.
źródło
Jeśli nie masz nic przeciwko korzystaniu z biblioteki innej firmy, dzięki cyklopowi- reaguj możesz mieszać sekwencyjne i równoległe strumienie w tym samym potoku i zapewnić niestandardowe ForkJoinPools. Na przykład
Lub jeśli chcielibyśmy kontynuować przetwarzanie w ramach sekwencyjnego strumienia
[Ujawnienie Jestem wiodącym twórcą Cyclops-reag]
źródło
Jeśli nie potrzebujesz niestandardowej puli wątków, ale chcesz ograniczyć liczbę jednoczesnych zadań, możesz użyć:
(Duplikat pytania o to jest zablokowany, więc proszę, proszę mnie tutaj)
źródło
możesz spróbować wdrożyć ten ForkJoinWorkerThreadFactory i wstrzyknąć go do klasy Fork-Join.
możesz to zrobić za pomocą tego konstruktora puli Fork-Join.
uwagi: - 1. jeśli użyjesz tego, weź pod uwagę, że w oparciu o implementację nowych wątków, wpłynie to na planowanie z JVM, co ogólnie planuje połączenia wątków z rozwidleniem do różnych rdzeni (traktowanych jako wątek obliczeniowy). 2. Nie ma to wpływu na planowanie zadań przez łączenie wideł do wątków. 3. Naprawdę nie zorientowałem się, w jaki sposób równoległy strumień wybiera wątki ze złączenia wideł (nie mogłem znaleźć na nim odpowiedniej dokumentacji), więc spróbuj użyć innej fabryki wątków, aby się upewnić, czy wątki w równoległym strumieniu są wybierane z dostarczonego przez customThreadFactory. 4. commonThreadPool nie będzie korzystał z tego customThreadFactory.
źródło