Próbuję zrozumieć, dlaczego następujący program Java daje OutOfMemoryError
, a odpowiedni program bez .parallel()
.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Mam dwa pytania:
Jakie jest zamierzone wyjście tego programu?
Bez
.parallel()
tego wydaje się, że to po prostu wyprowadza,sum(1+2+3+...)
co oznacza, że po prostu „zacina się” przy pierwszym strumieniu w flatMapie, co ma sens.Równolegle nie wiem, czy zachodzi oczekiwane zachowanie, ale domyślam się, że w jakiś sposób przeplata on pierwsze
n
strumienie, gdzien
jest liczba równoległych pracowników. Może również nieznacznie różnić się w zależności od zachowania polegającego na dzieleniu / buforowaniu.Co powoduje brak pamięci? W szczególności staram się zrozumieć, w jaki sposób te strumienie są wdrażane pod maską.
Zgaduję, że coś blokuje strumień, więc nigdy się nie kończy i jest w stanie pozbyć się wygenerowanych wartości, ale nie bardzo wiem, w jakiej kolejności rzeczy są oceniane i gdzie zachodzi buforowanie.
Edycja: Jeśli jest to istotne, używam Java 11.
Editt 2: Najwyraźniej to samo dzieje się nawet w przypadku prostego programu IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, więc może to mieć związek z lenistwem limit
zamiast flatMap
.
źródło
Odpowiedzi:
Mówicie „ ale nie do końca wiem, w jakiej kolejności rzeczy są oceniane i gdzie zachodzi buforowanie ”, właśnie o to chodzi w równoległych strumieniach. Kolejność oceny jest nieokreślona.
Krytycznym aspektem twojego przykładu jest
.limit(100_000_000)
. Oznacza to, że implementacja nie może po prostu sumować dowolnych wartości, ale musi sumować pierwsze 100 000 000 liczb. Zauważ, że w implementacji referencyjnej.unordered().limit(100_000_000)
nie zmienia wyniku, co wskazuje, że nie ma specjalnej implementacji dla przypadku nieuporządkowanego, ale jest to szczegół implementacji.Teraz, gdy wątki robocze przetwarzają elementy, nie mogą ich po prostu podsumować, ponieważ muszą wiedzieć, które elementy mogą zużywać, co zależy od liczby elementów poprzedzających określone obciążenie pracą. Ponieważ ten strumień nie zna rozmiarów, można o tym wiedzieć tylko wtedy, gdy przetworzono elementy prefiksu, co nigdy się nie zdarza w przypadku strumieni nieskończonych. Tak więc wątki robocze nadal buforują, ta informacja staje się dostępna.
Zasadniczo, gdy wątek roboczy wie, że przetwarza skrajnie lewy¹ fragment roboczy, może natychmiast zsumować elementy, policzyć je i zasygnalizować koniec, gdy osiągnie limit. Strumień może zostać zakończony, ale zależy to od wielu czynników.
W twoim przypadku prawdopodobnym scenariuszem jest to, że inne wątki robocze szybciej przydzielają bufory, niż liczy lewe zadanie. W tym scenariuszu subtelne zmiany w czasie mogą powodować, że strumień czasami wraca z wartością.
Kiedy spowalniamy wszystkie wątki robocze z wyjątkiem tego, które przetwarzają najbardziej lewy fragment, możemy spowodować zakończenie strumienia (przynajmniej w większości przebiegów):
¹ Stosuję się do sugestii Stuarta Marksa, aby podczas mówienia o kolejności spotkań zamiast kolejności przetwarzania stosować kolejność od lewej do prawej.
źródło
Files.lines(…)
? Został znacznie poprawiony w Javie 9.BufferedReader.lines()
w niektórych przypadkach nadal będzie się pojawiał (nie jest to domyślny system plików, specjalny zestaw znaków lub rozmiar większy niżInteger.MAX_FILES
). Jeśli jedno z nich ma zastosowanie, niestandardowe rozwiązanie może pomóc. To byłoby warte nowego pytania i odpowiedzi…Integer.MAX_VALUE
, oczywiście…Mój najlepszy przypuszczenie, że dodanie
parallel()
zmienia zachowanie wewnętrznejflatMap()
, która już miała problemy są oceniane przed leniwie .Występujący
OutOfMemoryError
błąd został zgłoszony w [JDK-8202307] Pobieranie java.lang.OutOfMemoryError: Przestrzeń sterty Java podczas wywoływania Stream.iterator (). Next () w strumieniu, który używa nieskończonego / bardzo dużego strumienia w płaskiej mapie . Jeśli spojrzysz na bilet, otrzymujesz mniej więcej ten sam ślad stosu. Bilet został zamknięty, ponieważ nie zostanie naprawiony z następującego powodu:źródło
Przyczyną OOME nie jest to, że strumień jest nieskończony, ale fakt, że tak nie jest .
To znaczy, jeśli skomentujesz to
.limit(...)
, nigdy nie zabraknie pamięci - ale oczywiście nigdy się nie skończy.Po podzieleniu strumień może śledzić liczbę elementów tylko wtedy, gdy są one zgromadzone w każdym wątku (wygląda tak, jak w rzeczywistości akumulator
Spliterators$ArraySpliterator#array
).Wygląda na to, że możesz go odtworzyć bez
flatMap
, po prostu uruchom następujące polecenie-Xmx128m
:Jednak po skomentowaniu
limit()
, powinien działać dobrze, dopóki nie zdecydujesz się oszczędzić laptopa.Oprócz faktycznych szczegółów implementacji, oto, co myślę, że się dzieje:
Za
limit
pomocąsum
reduktora chce się zsumować pierwsze X elementów, aby żaden wątek nie mógł wysyłać sum częściowych. Każdy „plasterek” (wątek) będzie musiał gromadzić elementy i przepuszczać je. Bez limitu nie ma takiego ograniczenia, więc każdy „wycinek” po prostu obliczy częściową sumę otrzymanych elementów (na zawsze), zakładając, że ostatecznie wyemituje wynik.źródło
parallel()
użyjeForkJoinPool
wewnętrznie, aby osiągnąć równoległość.Spliterator
Zostaną wykorzystane w celu przypisania pracy do każdegoForkJoin
zadania, myślę, że możemy wezwać jednostkę pracy tutaj jako „Split”.Integer.sum()
, używany przezIntStream.sum
reduktor. Zobaczysz, że wywołania wersji bez limitu działają przez cały czas, podczas gdy wersja limitowana nigdy nie będzie mogła wywoływać jej przed OOM.