Równoległy strumień Java - kolejność wywoływania metody parallel () [zamknięte]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Kiedy to napisałem, założyłem, że wątki zostaną odrodzone tylko wywołanie mapy, ponieważ równolegle jest umieszczane po mapie. Ale niektóre wiersze w pliku otrzymywały różne numery rekordów dla każdego wykonania.

Przeczytałem oficjalną dokumentację dotyczącą strumieni Java i kilka stron internetowych, aby zrozumieć, jak działają strumienie pod maską.

Kilka pytań:

  • Równoległy strumień Java działa w oparciu o SplitIterator , który jest implementowany przez każdą kolekcję, taką jak ArrayList, LinkedList itp. Kiedy konstruujemy równoległy strumień z tych kolekcji, odpowiedni podzielony iterator zostanie użyty do podzielenia i iteracji kolekcji. To wyjaśnia, dlaczego równoległość występowała na poziomie oryginalnego źródła wejściowego (wierszy pliku), a nie na podstawie wyniku mapy (tj. Zapis pojo). Czy moje rozumowanie jest prawidłowe?

  • W moim przypadku wejściem jest plikowy strumień IO. Który iterator podzielony zostanie użyty?

  • Nie ma znaczenia, gdzie umieszczamy parallel()w rurociągu. Oryginalne źródło wejściowe będzie zawsze podzielone i zostaną zastosowane pozostałe operacje pośrednie.

    W takim przypadku Java nie powinna umożliwiać użytkownikom wykonywania operacji równoległych w dowolnym miejscu potoku, z wyjątkiem oryginalnego źródła. Ponieważ daje złe zrozumienie dla tych, którzy nie wiedzą, jak działa strumień Java. Wiem, że parallel()operacja byłaby zdefiniowana dla typu obiektu Stream, więc działa w ten sposób. Ale lepiej jest podać alternatywne rozwiązanie.

  • W powyższym fragmencie kodu próbuję dodać numer wiersza do każdego rekordu w pliku wejściowym, więc należy go zamówić. Chcę jednak zastosować doSomeOperation()równolegle, ponieważ jest to logika ciężka. Jednym ze sposobów na osiągnięcie tego jest napisanie własnego dostosowanego podzielonego iteratora. Czy jest jakiś inny sposób?

poszukiwacz
źródło
2
Ma to więcej wspólnego z tym, jak twórcy Java postanowili zaprojektować interfejs. Umieszczasz swoje żądania w potoku, a wszystko, co nie jest ostateczną operacją, zostanie zebrane jako pierwsze. parallel()jest niczym innym jak ogólnym żądaniem modyfikatora, które jest stosowane do bazowego obiektu strumienia. Pamiętaj, że istnieje tylko jeden strumień źródłowy, jeśli nie zastosujesz końcowych operacji do potoku, tzn. Dopóki nic nie zostanie „wykonane”. To powiedziawszy, w zasadzie kwestionujesz opcje projektowania Java. Który opiera się na opiniach i naprawdę nie możemy w tym pomóc.
Zabuzard
1
Całkowicie rozumiem twój punkt i zamieszanie, ale nie sądzę, że są o wiele lepsze rozwiązania. Ta metoda jest oferowana Streambezpośrednio w interfejsie, a ze względu na fajne kaskadowanie każda operacja jest zwracana Streamponownie. Wyobraź sobie, że ktoś chce ci dać, Streamale już zastosował kilka podobnych operacji map. Ty, jako użytkownik, nadal chcesz mieć możliwość decydowania, czy chcesz uruchomić go równolegle, czy nie. Dlatego musi być możliwe, aby parallel()nadal dzwonić , mimo że strumień już istnieje.
Zabuzard
1
Ponadto wolałbym zapytać, dlaczego chcesz sekwencyjnie wykonywać część strumienia, a następnie przejść na tryb równoległy. Jeśli strumień jest już wystarczająco duży, aby kwalifikować się do wykonania równoległego, prawdopodobnie dotyczy to również wszystkiego wcześniej w potoku. Dlaczego więc nie zastosować równoległego wykonywania również dla tej części? Rozumiem, że istnieją przypadki krawędziowe, takie jak jeśli dramatycznie zwiększysz rozmiar za pomocą flatMaplub wykonasz niebezpieczne metody wątków lub podobne.
Zabuzard
1
@Zabuza Nie kwestionuję wyboru projektu w Javie, ale po prostu zgłaszam swoje obawy. Każdy podstawowy użytkownik strumienia Java może mieć takie same zamieszanie, chyba że rozumie działanie strumienia. Całkowicie zgadzam się z twoim drugim komentarzem. Właśnie podkreśliłem jedno możliwe rozwiązanie, które może mieć swoje wady, jak wspomniałeś. Ale możemy zobaczyć, czy można to rozwiązać w inny sposób. Jeśli chodzi o twój trzeci komentarz, wspomniałem już o moim przypadku użycia w ostatnim punkcie mojego opisu
odkrywca
1
@Eugene, gdy Pathznajduje się w lokalnym systemie plików i używasz najnowszego JDK, spliterator będzie miał lepsze możliwości przetwarzania równoległego niż grupowanie wielokrotności 1024. Ale zrównoważone dzielenie może nawet przynieść efekt przeciwny do zamierzonego w niektórych findFirstscenariuszach…
Holger

Odpowiedzi:

8

To wyjaśnia, dlaczego równoległość występowała na poziomie oryginalnego źródła wejściowego (wierszy pliku), a nie na podstawie wyniku mapy (tj. Zapis pojo).

Cały strumień ma charakter równoległy lub sekwencyjny. Nie wybieramy podzbioru operacji do uruchomienia sekwencyjnego lub równoległego.

Po zainicjowaniu operacji terminalowej potok strumienia jest wykonywany sekwencyjnie lub równolegle w zależności od orientacji strumienia, na którym jest wywoływany. [...] Gdy inicjowana jest operacja terminalowa, potok strumienia jest wykonywany sekwencyjnie lub równolegle, w zależności od trybu strumienia, na który jest wywoływany. to samo źródło

Jak wspomniałeś, równoległe strumienie używają podzielonych iteratorów. Oczywiście jest to dzielenie danych na partycje przed rozpoczęciem operacji.


W moim przypadku wejściem jest plikowy strumień IO. Który iterator podzielony zostanie użyty?

Patrząc na źródło, widzę, że używa java.nio.file.FileChannelLinesSpliterator


Nie ma znaczenia, gdzie umieszczamy równolegle () w potoku. Oryginalne źródło wejściowe będzie zawsze podzielone i zostaną zastosowane pozostałe operacje pośrednie.

Dobrze. Możesz nawet dzwonić parallel()i sequential()wiele razy. Ten przywołany jako ostatni wygra. Kiedy dzwonimy parallel(), ustawiamy to dla zwracanego strumienia; i jak stwierdzono powyżej, wszystkie operacje są uruchamiane sekwencyjnie lub równolegle.


W takim przypadku Java nie powinna pozwalać użytkownikom na umieszczanie operacji równoległych w dowolnym miejscu potoku, z wyjątkiem oryginalnego źródła ...

To staje się kwestią opinii. Myślę, że Zabuza daje dobry powód, aby poprzeć wybór projektantów JDK.


Jednym ze sposobów na osiągnięcie tego jest napisanie własnego dostosowanego podzielonego iteratora. Czy jest jakiś inny sposób?

To zależy od twoich operacji

  • Jeśli findFirst()jest to Twoja prawdziwa operacja terminalowa, nie musisz nawet martwić się o równoległe wykonywanie, ponieważ i tak nie będzie wielu połączeń doSomething()( findFirst()jest to zwarcie). .parallel()w rzeczywistości może powodować przetwarzanie więcej niż jednego elementu, podczas gdy findFirst()w strumieniu sekwencyjnym zapobiegałoby to.
  • Jeśli twoja terminalowa operacja nie tworzy dużej ilości danych, być może możesz stworzyć swoje Recordobiekty za pomocą sekwencyjnego strumienia, a następnie przetworzyć wynik równolegle:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
  • Jeśli Twój potok załadowałby dużo danych do pamięci (co może być powodem, którego używasz Files.lines()), być może będziesz potrzebować niestandardowego iteratora podziału. Zanim jednak tam przejdę, przyjrzę się innym opcjom (na przykład zapisywanie wierszy z kolumną identyfikacyjną - to tylko moja opinia).
    Próbowałbym również przetwarzać rekordy w mniejszych partiach, takich jak to:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }

    Wykonuje się to doSomeOperation()równolegle bez ładowania wszystkich danych do pamięci. Pamiętaj jednak, że batchSizetrzeba to przemyśleć.

ernest_k
źródło
1
Dziękuję za wyjaśnienie. Dobrze jest wiedzieć o trzecim rozwiązaniu, które wyróżniłeś. Przyjrzę się, ponieważ nie korzystałem z takeWhile i dostawcy.
odkrywca
2
Niestandardowa Spliteratorimplementacja nie byłaby bardziej skomplikowana, umożliwiając jednocześnie bardziej wydajne przetwarzanie równoległe…
Holger
1
Każda z twoich parallelStreamoperacji wewnętrznych ma ustalony narzut, aby zainicjować operację i poczekać na wynik końcowy, ograniczając się do równoległości batchSize. Po pierwsze, potrzebujesz wielu dostępnych obecnie rdzeni procesora, aby uniknąć bezczynności wątków. Następnie liczba powinna być wystarczająco wysoka, aby zrekompensować stały narzut, ale im wyższa liczba, tym większa pauza nałożona przez operację sekwencyjnego odczytu przed rozpoczęciem przetwarzania równoległego.
Holger
1
Zamiana równoległego strumienia zewnętrznego spowodowałaby złe interferencje z wewnętrznym w bieżącej implementacji, poza tym, że Stream.generatewytwarza nieuporządkowany strumień, który nie działa z przypadkami zamierzonego użycia PO findFirst(). W przeciwieństwie do tego pojedynczy równoległy strumień z rozdzielaczem, który zwraca fragmenty w działaniu, trySplitdziała prosto i pozwala wątkom roboczym przetworzyć kolejny fragment bez oczekiwania na zakończenie poprzedniego.
Holger
2
Nie ma powodu zakładać, że findFirst()operacja przetworzy tylko niewielką liczbę elementów. Pierwsze dopasowanie może nadal wystąpić po przetworzeniu 90% wszystkich elementów. Ponadto, mając dziesięć milionów linii, nawet znalezienie dopasowania po 10% nadal wymaga przetworzenia miliona linii.
Holger
7

Oryginalny projekt Stream zawierał pomysł na obsługę kolejnych etapów potoku z różnymi ustawieniami wykonywania równoległego, ale pomysł ten został porzucony. Interfejs API może wynikać z tego czasu, ale z drugiej strony projekt interfejsu API, który zmusza osobę wywołującą do podjęcia pojedynczej jednoznacznej decyzji o równoległym lub sekwencyjnym wykonaniu, byłby znacznie bardziej skomplikowany.

Rzeczywisty Spliteratorużywany przez Files.lines(…)zależy od implementacji. W Javie 8 (Oracle lub OpenJDK) zawsze otrzymujesz to samo, co z BufferedReader.lines(). W ostatnich pakietów JDK, jeśli Pathnależy do domyślnego systemu plików i charset jest jednym z obsługiwanych do tej funkcji, można uzyskać strumień z dedykowaną Spliteratorrealizacji, java.nio.file.FileChannelLinesSpliterator. Jeśli warunki wstępne nie są spełnione, otrzymujesz to samo, co z BufferedReader.lines(), który wciąż jest oparty na Iteratorzaimplementowanym wewnątrz BufferedReaderi jest zawinięty przez Spliterators.spliteratorUnknownSize.

Twoje konkretne zadanie najlepiej realizować za pomocą niestandardowego, Spliteratorktóry może wykonać numerację linii bezpośrednio u źródła, przed przetwarzaniem równoległym, aby umożliwić późniejsze przetwarzanie równoległe bez ograniczeń.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
źródło
0

Poniżej przedstawiono prostą demonstrację zastosowania równoległego. Dane wyjściowe z podglądu wyraźnie pokazują różnicę między tymi dwoma przykładami. Uwaga: mappołączenie jest po prostu rzucane, aby dodać inną metodę przed parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
źródło