W równoległych nieskończonych strumieniach Java zabrakło pamięci

16

Próbuję zrozumieć, dlaczego następujący program Java daje OutOfMemoryError, a odpowiedni program bez .parallel().

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

Mam dwa pytania:

  1. Jakie jest zamierzone wyjście tego programu?

    Bez .parallel()tego wydaje się, że to po prostu wyprowadza, sum(1+2+3+...)co oznacza, że ​​po prostu „zacina się” przy pierwszym strumieniu w flatMapie, co ma sens.

    Równolegle nie wiem, czy zachodzi oczekiwane zachowanie, ale domyślam się, że w jakiś sposób przeplata on pierwsze nstrumienie, gdzie njest liczba równoległych pracowników. Może również nieznacznie różnić się w zależności od zachowania polegającego na dzieleniu / buforowaniu.

  2. Co powoduje brak pamięci? W szczególności staram się zrozumieć, w jaki sposób te strumienie są wdrażane pod maską.

    Zgaduję, że coś blokuje strumień, więc nigdy się nie kończy i jest w stanie pozbyć się wygenerowanych wartości, ale nie bardzo wiem, w jakiej kolejności rzeczy są oceniane i gdzie zachodzi buforowanie.

Edycja: Jeśli jest to istotne, używam Java 11.

Editt 2: Najwyraźniej to samo dzieje się nawet w przypadku prostego programu IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), więc może to mieć związek z lenistwem limitzamiast flatMap.

Thomas Ahle
źródło
parallel () wewnętrznie korzysta z ForkJoinPool. Chyba ForkJoin Framework jest w Javie z Java 7
aravind

Odpowiedzi:

9

Mówicie „ ale nie do końca wiem, w jakiej kolejności rzeczy są oceniane i gdzie zachodzi buforowanie ”, właśnie o to chodzi w równoległych strumieniach. Kolejność oceny jest nieokreślona.

Krytycznym aspektem twojego przykładu jest .limit(100_000_000). Oznacza to, że implementacja nie może po prostu sumować dowolnych wartości, ale musi sumować pierwsze 100 000 000 liczb. Zauważ, że w implementacji referencyjnej .unordered().limit(100_000_000)nie zmienia wyniku, co wskazuje, że nie ma specjalnej implementacji dla przypadku nieuporządkowanego, ale jest to szczegół implementacji.

Teraz, gdy wątki robocze przetwarzają elementy, nie mogą ich po prostu podsumować, ponieważ muszą wiedzieć, które elementy mogą zużywać, co zależy od liczby elementów poprzedzających określone obciążenie pracą. Ponieważ ten strumień nie zna rozmiarów, można o tym wiedzieć tylko wtedy, gdy przetworzono elementy prefiksu, co nigdy się nie zdarza w przypadku strumieni nieskończonych. Tak więc wątki robocze nadal buforują, ta informacja staje się dostępna.

Zasadniczo, gdy wątek roboczy wie, że przetwarza skrajnie lewy¹ fragment roboczy, może natychmiast zsumować elementy, policzyć je i zasygnalizować koniec, gdy osiągnie limit. Strumień może zostać zakończony, ale zależy to od wielu czynników.

W twoim przypadku prawdopodobnym scenariuszem jest to, że inne wątki robocze szybciej przydzielają bufory, niż liczy lewe zadanie. W tym scenariuszu subtelne zmiany w czasie mogą powodować, że strumień czasami wraca z wartością.

Kiedy spowalniamy wszystkie wątki robocze z wyjątkiem tego, które przetwarzają najbardziej lewy fragment, możemy spowodować zakończenie strumienia (przynajmniej w większości przebiegów):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ Stosuję się do sugestii Stuarta Marksa, aby podczas mówienia o kolejności spotkań zamiast kolejności przetwarzania stosować kolejność od lewej do prawej.

Holger
źródło
Bardzo ładna odpowiedź! Zastanawiam się, czy istnieje ryzyko, że wszystkie wątki zaczną uruchamiać operacje flatMap i żaden z nich nie zostanie przydzielony do opróżnienia buforów (sumowanie)? W moim rzeczywistym przypadku nieskończone strumienie są zamiast tego plikami zbyt dużymi, aby utrzymać je w pamięci. Zastanawiam się, jak mogę przepisać strumień, aby zmniejszyć zużycie pamięci?
Thomas Ahle
1
Używasz Files.lines(…)? Został znacznie poprawiony w Javie 9.
Holger
1
Tak właśnie działa w Javie 8. W nowszych wersjach środowiska JRE BufferedReader.lines()w niektórych przypadkach nadal będzie się pojawiał (nie jest to domyślny system plików, specjalny zestaw znaków lub rozmiar większy niż Integer.MAX_FILES). Jeśli jedno z nich ma zastosowanie, niestandardowe rozwiązanie może pomóc. To byłoby warte nowego pytania i odpowiedzi…
Holger
1
Integer.MAX_VALUE, oczywiście…
Holger
1
Co to jest strumień zewnętrzny, strumień plików? Czy ma przewidywalny rozmiar?
Holger
5

Mój najlepszy przypuszczenie, że dodanie parallel()zmienia zachowanie wewnętrznej flatMap(), która już miała problemy są oceniane przed leniwie .

Występujący OutOfMemoryErrorbłąd został zgłoszony w [JDK-8202307] Pobieranie java.lang.OutOfMemoryError: Przestrzeń sterty Java podczas wywoływania Stream.iterator (). Next () w strumieniu, który używa nieskończonego / bardzo dużego strumienia w płaskiej mapie . Jeśli spojrzysz na bilet, otrzymujesz mniej więcej ten sam ślad stosu. Bilet został zamknięty, ponieważ nie zostanie naprawiony z następującego powodu:

iterator()I spliterator()metody są „luki ratunkowe” być stosowane, gdy nie jest możliwe zastosowanie innych operacji. Mają pewne ograniczenia, ponieważ przekształcają model wypychania implementacji strumienia w model ściągania. Takie przejście wymaga buforowania w niektórych przypadkach, na przykład gdy element jest (płaski) odwzorowany na dwa lub więcej elementów . Znacznie skomplikowałoby to implementację strumienia, prawdopodobnie kosztem typowych przypadków, w celu wsparcia pojęcia przeciwciśnienia w celu poinformowania, ile elementów należy przeciągnąć przez zagnieżdżone warstwy produkcji elementów.

Karol Dowbecki
źródło
To jest bardzo ciekawe! Sensowne jest, że przejście push / pull wymaga buforowania, które może zużyć pamięć. Jednak w moim przypadku wydaje się, że użycie tylko push powinno działać dobrze i po prostu odrzucić pozostałe elementy, gdy się pojawią? A może mówisz, że flapmap powoduje utworzenie iteratora?
Thomas Ahle
3

Przyczyną OOME nie jest to, że strumień jest nieskończony, ale fakt, że tak nie jest .

To znaczy, jeśli skomentujesz to .limit(...), nigdy nie zabraknie pamięci - ale oczywiście nigdy się nie skończy.

Po podzieleniu strumień może śledzić liczbę elementów tylko wtedy, gdy są one zgromadzone w każdym wątku (wygląda tak, jak w rzeczywistości akumulator Spliterators$ArraySpliterator#array).

Wygląda na to, że możesz go odtworzyć bez flatMap, po prostu uruchom następujące polecenie -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Jednak po skomentowaniu limit(), powinien działać dobrze, dopóki nie zdecydujesz się oszczędzić laptopa.

Oprócz faktycznych szczegółów implementacji, oto, co myślę, że się dzieje:

Za limitpomocą sumreduktora chce się zsumować pierwsze X elementów, aby żaden wątek nie mógł wysyłać sum częściowych. Każdy „plasterek” (wątek) będzie musiał gromadzić elementy i przepuszczać je. Bez limitu nie ma takiego ograniczenia, więc każdy „wycinek” po prostu obliczy częściową sumę otrzymanych elementów (na zawsze), zakładając, że ostatecznie wyemituje wynik.

Costi Ciudatu
źródło
Co masz na myśli mówiąc „po podziale”? Czy limit jakoś go dzieli?
Thomas Ahle
@ThomasAhle parallel()użyje ForkJoinPoolwewnętrznie, aby osiągnąć równoległość. SpliteratorZostaną wykorzystane w celu przypisania pracy do każdego ForkJoinzadania, myślę, że możemy wezwać jednostkę pracy tutaj jako „Split”.
Karol Dowbecki
Ale dlaczego dzieje się tak tylko z limitem?
Thomas Ahle
@ThomasAhle Edytowałem odpowiedź za pomocą moich dwóch centów.
Costi Ciudatu
1
@ThomasAhle ustawia punkt przerwania Integer.sum(), używany przez IntStream.sumreduktor. Zobaczysz, że wywołania wersji bez limitu działają przez cały czas, podczas gdy wersja limitowana nigdy nie będzie mogła wywoływać jej przed OOM.
Costi Ciudatu