AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Kiedy to napisałem, założyłem, że wątki zostaną odrodzone tylko wywołanie mapy, ponieważ równolegle jest umieszczane po mapie. Ale niektóre wiersze w pliku otrzymywały różne numery rekordów dla każdego wykonania.
Przeczytałem oficjalną dokumentację dotyczącą strumieni Java i kilka stron internetowych, aby zrozumieć, jak działają strumienie pod maską.
Kilka pytań:
Równoległy strumień Java działa w oparciu o SplitIterator , który jest implementowany przez każdą kolekcję, taką jak ArrayList, LinkedList itp. Kiedy konstruujemy równoległy strumień z tych kolekcji, odpowiedni podzielony iterator zostanie użyty do podzielenia i iteracji kolekcji. To wyjaśnia, dlaczego równoległość występowała na poziomie oryginalnego źródła wejściowego (wierszy pliku), a nie na podstawie wyniku mapy (tj. Zapis pojo). Czy moje rozumowanie jest prawidłowe?
W moim przypadku wejściem jest plikowy strumień IO. Który iterator podzielony zostanie użyty?
Nie ma znaczenia, gdzie umieszczamy
parallel()
w rurociągu. Oryginalne źródło wejściowe będzie zawsze podzielone i zostaną zastosowane pozostałe operacje pośrednie.W takim przypadku Java nie powinna umożliwiać użytkownikom wykonywania operacji równoległych w dowolnym miejscu potoku, z wyjątkiem oryginalnego źródła. Ponieważ daje złe zrozumienie dla tych, którzy nie wiedzą, jak działa strumień Java. Wiem, że
parallel()
operacja byłaby zdefiniowana dla typu obiektu Stream, więc działa w ten sposób. Ale lepiej jest podać alternatywne rozwiązanie.W powyższym fragmencie kodu próbuję dodać numer wiersza do każdego rekordu w pliku wejściowym, więc należy go zamówić. Chcę jednak zastosować
doSomeOperation()
równolegle, ponieważ jest to logika ciężka. Jednym ze sposobów na osiągnięcie tego jest napisanie własnego dostosowanego podzielonego iteratora. Czy jest jakiś inny sposób?
źródło
parallel()
jest niczym innym jak ogólnym żądaniem modyfikatora, które jest stosowane do bazowego obiektu strumienia. Pamiętaj, że istnieje tylko jeden strumień źródłowy, jeśli nie zastosujesz końcowych operacji do potoku, tzn. Dopóki nic nie zostanie „wykonane”. To powiedziawszy, w zasadzie kwestionujesz opcje projektowania Java. Który opiera się na opiniach i naprawdę nie możemy w tym pomóc.Stream
bezpośrednio w interfejsie, a ze względu na fajne kaskadowanie każda operacja jest zwracanaStream
ponownie. Wyobraź sobie, że ktoś chce ci dać,Stream
ale już zastosował kilka podobnych operacjimap
. Ty, jako użytkownik, nadal chcesz mieć możliwość decydowania, czy chcesz uruchomić go równolegle, czy nie. Dlatego musi być możliwe, abyparallel()
nadal dzwonić , mimo że strumień już istnieje.flatMap
lub wykonasz niebezpieczne metody wątków lub podobne.Path
znajduje się w lokalnym systemie plików i używasz najnowszego JDK, spliterator będzie miał lepsze możliwości przetwarzania równoległego niż grupowanie wielokrotności 1024. Ale zrównoważone dzielenie może nawet przynieść efekt przeciwny do zamierzonego w niektórychfindFirst
scenariuszach…Odpowiedzi:
Cały strumień ma charakter równoległy lub sekwencyjny. Nie wybieramy podzbioru operacji do uruchomienia sekwencyjnego lub równoległego.
Jak wspomniałeś, równoległe strumienie używają podzielonych iteratorów. Oczywiście jest to dzielenie danych na partycje przed rozpoczęciem operacji.
Patrząc na źródło, widzę, że używa
java.nio.file.FileChannelLinesSpliterator
Dobrze. Możesz nawet dzwonić
parallel()
isequential()
wiele razy. Ten przywołany jako ostatni wygra. Kiedy dzwonimyparallel()
, ustawiamy to dla zwracanego strumienia; i jak stwierdzono powyżej, wszystkie operacje są uruchamiane sekwencyjnie lub równolegle.To staje się kwestią opinii. Myślę, że Zabuza daje dobry powód, aby poprzeć wybór projektantów JDK.
To zależy od twoich operacji
findFirst()
jest to Twoja prawdziwa operacja terminalowa, nie musisz nawet martwić się o równoległe wykonywanie, ponieważ i tak nie będzie wielu połączeńdoSomething()
(findFirst()
jest to zwarcie)..parallel()
w rzeczywistości może powodować przetwarzanie więcej niż jednego elementu, podczas gdyfindFirst()
w strumieniu sekwencyjnym zapobiegałoby to.Jeśli twoja terminalowa operacja nie tworzy dużej ilości danych, być może możesz stworzyć swoje
Record
obiekty za pomocą sekwencyjnego strumienia, a następnie przetworzyć wynik równolegle:Jeśli Twój potok załadowałby dużo danych do pamięci (co może być powodem, którego używasz
Files.lines()
), być może będziesz potrzebować niestandardowego iteratora podziału. Zanim jednak tam przejdę, przyjrzę się innym opcjom (na przykład zapisywanie wierszy z kolumną identyfikacyjną - to tylko moja opinia).Próbowałbym również przetwarzać rekordy w mniejszych partiach, takich jak to:
Wykonuje się to
doSomeOperation()
równolegle bez ładowania wszystkich danych do pamięci. Pamiętaj jednak, żebatchSize
trzeba to przemyśleć.źródło
Spliterator
implementacja nie byłaby bardziej skomplikowana, umożliwiając jednocześnie bardziej wydajne przetwarzanie równoległe…parallelStream
operacji wewnętrznych ma ustalony narzut, aby zainicjować operację i poczekać na wynik końcowy, ograniczając się do równoległościbatchSize
. Po pierwsze, potrzebujesz wielu dostępnych obecnie rdzeni procesora, aby uniknąć bezczynności wątków. Następnie liczba powinna być wystarczająco wysoka, aby zrekompensować stały narzut, ale im wyższa liczba, tym większa pauza nałożona przez operację sekwencyjnego odczytu przed rozpoczęciem przetwarzania równoległego.Stream.generate
wytwarza nieuporządkowany strumień, który nie działa z przypadkami zamierzonego użycia POfindFirst()
. W przeciwieństwie do tego pojedynczy równoległy strumień z rozdzielaczem, który zwraca fragmenty w działaniu,trySplit
działa prosto i pozwala wątkom roboczym przetworzyć kolejny fragment bez oczekiwania na zakończenie poprzedniego.findFirst()
operacja przetworzy tylko niewielką liczbę elementów. Pierwsze dopasowanie może nadal wystąpić po przetworzeniu 90% wszystkich elementów. Ponadto, mając dziesięć milionów linii, nawet znalezienie dopasowania po 10% nadal wymaga przetworzenia miliona linii.Oryginalny projekt Stream zawierał pomysł na obsługę kolejnych etapów potoku z różnymi ustawieniami wykonywania równoległego, ale pomysł ten został porzucony. Interfejs API może wynikać z tego czasu, ale z drugiej strony projekt interfejsu API, który zmusza osobę wywołującą do podjęcia pojedynczej jednoznacznej decyzji o równoległym lub sekwencyjnym wykonaniu, byłby znacznie bardziej skomplikowany.
Rzeczywisty
Spliterator
używany przezFiles.lines(…)
zależy od implementacji. W Javie 8 (Oracle lub OpenJDK) zawsze otrzymujesz to samo, co zBufferedReader.lines()
. W ostatnich pakietów JDK, jeśliPath
należy do domyślnego systemu plików i charset jest jednym z obsługiwanych do tej funkcji, można uzyskać strumień z dedykowanąSpliterator
realizacji,java.nio.file.FileChannelLinesSpliterator
. Jeśli warunki wstępne nie są spełnione, otrzymujesz to samo, co zBufferedReader.lines()
, który wciąż jest oparty naIterator
zaimplementowanym wewnątrzBufferedReader
i jest zawinięty przezSpliterators.spliteratorUnknownSize
.Twoje konkretne zadanie najlepiej realizować za pomocą niestandardowego,
Spliterator
który może wykonać numerację linii bezpośrednio u źródła, przed przetwarzaniem równoległym, aby umożliwić późniejsze przetwarzanie równoległe bez ograniczeń.źródło
Poniżej przedstawiono prostą demonstrację zastosowania równoległego. Dane wyjściowe z podglądu wyraźnie pokazują różnicę między tymi dwoma przykładami. Uwaga:
map
połączenie jest po prostu rzucane, aby dodać inną metodę przedparallel
.źródło