Czy potrafisz ponownie wyważyć niezrównoważony Spliterator o nieznanej wielkości?

12

Chcę użyć Streamdo równoległego przetwarzania heterogenicznego zestawu zdalnie przechowywanych plików JSON o nieznanej liczbie (liczba plików nie jest znana z góry). Rozmiar plików może się znacznie różnić, od 1 rekordu JSON na plik do 100 000 rekordów w niektórych innych plikach. Rekord JSON w tym przypadku oznacza samodzielne JSON obiekt reprezentowany jako jeden wiersz w pliku.

Naprawdę chcę do tego używać strumieni, więc wdrożyłem to Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

Problem, który mam, polega na tym, że podczas gdy strumień pięknie zrównuje się na początku, w końcu największy plik jest przetwarzany w jednym wątku. Uważam, że bliższa przyczyna jest dobrze udokumentowana: spliterator jest „niezrównoważony”.

Mówiąc bardziej konkretnie, wydaje się, że trySplitmetoda nie jest wywoływana po pewnym momencie Stream.forEachcyklu życia, więc dodatkowa logika do dystrybucji małych partii na końcu trySplitjest rzadko wykonywana.

Zauważ, że wszystkie spliteratory zwrócone z trySplit współużytkują ten sam pathsiterator. Pomyślałem, że to naprawdę sprytny sposób na zrównoważenie pracy we wszystkich rozdzielaczach, ale to nie wystarczyło do osiągnięcia pełnej równoległości.

Chciałbym, aby równoległe przetwarzanie przebiegało najpierw między plikami, a następnie, gdy niewiele dużych plików pozostało rozdzielających się, chcę przeprowadzić równoległość między fragmentami pozostałych plików. Taki był cel elsebloku pod koniec trySplit.

Czy istnieje prosty / prosty / kanoniczny sposób rozwiązania tego problemu?

Alex R.
źródło
2
Potrzebujesz oszacowania rozmiaru. Może być całkowicie fałszywy, o ile w przybliżeniu odzwierciedla stosunek niezrównoważonego podziału. W przeciwnym razie strumień nie wie, że podziały są niezrównoważone i zatrzyma się po utworzeniu określonej liczby porcji.
Holger,
@Holger, czy możesz rozwinąć tę kwestię: „zatrzyma się po utworzeniu pewnej liczby fragmentów” lub skierujesz mnie na źródło JDK? Jaka jest liczba fragmentów, w których się zatrzymuje?
Alex R
Kod jest nieistotny, ponieważ pokazywałby zbyt wiele nieistotnych szczegółów implementacji, które mogłyby ulec zmianie w dowolnym momencie. Istotne jest to, że implementacja próbuje wywoływać split wystarczająco często, aby każdy wątek roboczy (dostosowany do liczby rdzeni procesora) miał coś do zrobienia. Aby zrekompensować nieprzewidywalne różnice w czasie obliczeń, najprawdopodobniej wytworzy jeszcze więcej fragmentów niż wątki robocze, aby umożliwić kradzież pracy i wykorzystać szacowane rozmiary jako heurystyczne (np. Aby zdecydować, który sub spliterator ma dalej dzielić). Zobacz także stackoverflow.com/a/48174508/2711488
Holger
Zrobiłem kilka eksperymentów, aby spróbować zrozumieć twój komentarz. Heurystyka wydaje się dość prymitywna. Wygląda na to, Long.MAX_VALUEże powrót powoduje nadmierne i niepotrzebne dzielenie, podczas gdy wszelkie szacunki inne niż Long.MAX_VALUEpowodują zatrzymanie dalszego podziału, zabijając równoległość. Zwrócenie zestawu dokładnych szacunków nie prowadzi do żadnych inteligentnych optymalizacji.
Alex R
Nie twierdzę, że strategia implementacji była bardzo inteligentna, ale przynajmniej działa w niektórych scenariuszach z szacowanymi rozmiarami (w przeciwnym razie było o wiele więcej raportów o błędach). Wygląda na to, że podczas eksperymentów wystąpiły pewne błędy. Na przykład w kodzie pytania rozszerzasz, AbstractSpliteratorale zastępujesz, trySplit()co jest złym zestawem do niczego innego Long.MAX_VALUE, ponieważ nie dostosowujesz oszacowania rozmiaru trySplit(). Następnie trySplit()oszacowanie rozmiaru powinno zostać zmniejszone o liczbę elementów, które zostały rozdzielone.
Holger

Odpowiedzi:

0

Twój trySplitpowinien dzieli wyjściowe równej wielkości, niezależnie od wielkości plików bazowych. Powinieneś traktować wszystkie pliki jako pojedynczą jednostkę i za ArrayListkażdym razem wypełniać splicator z powrotem z tą samą liczbą obiektów JSON. Liczba obiektów powinna być taka, aby przetworzenie jednego podziału trwało od 1 do 10 milisekund: mniej niż 1 ms i zaczynasz zbliżać się do kosztów przekazania partii do wątku roboczego, wyższego i zaczynasz ryzykować nierównomierne obciążenie procesora z powodu zadania, które są zbyt gruboziarniste.

Spliterator nie jest zobowiązany do zgłaszania oszacowania rozmiaru, a już robisz to poprawnie: twoje oszacowanie jest Long.MAX_VALUE, co jest specjalną wartością oznaczającą „bez ograniczeń”. Jeśli jednak masz wiele plików z jednym obiektem JSON, co skutkuje partiami o rozmiarze 1, pogorszy to twoją wydajność na dwa sposoby: obciążenie związane z otwieraniem, czytaniem i zamykaniem pliku może stać się wąskim gardłem, a jeśli uda ci się uciec że koszt przekazania wątku może być znaczny w porównaniu z kosztem przetworzenia jednego elementu, ponownie powodując wąskie gardło.

Pięć lat temu rozwiązałem podobny problem, możesz rzucić okiem na moje rozwiązanie .

Marko Topolnik
źródło
Tak, „nie jesteś zobowiązany do zgłaszania oszacowania rozmiaru” i Long.MAX_VALUEpoprawnie opisujesz nieznany rozmiar, ale to nie pomaga, gdy faktyczna implementacja Stream działa słabo. Nawet użycie wyniku ThreadLocalRandom.current().nextInt(100, 100_000)jako oszacowanego rozmiaru daje lepsze wyniki.
Holger
Sprawdził się dobrze w moich przypadkach użycia, w których koszt obliczeniowy każdego elementu był znaczny. Z łatwością osiągnąłem 98% całkowitego zużycia procesora i przepustowości skalowanej prawie liniowo z równoległością. Zasadniczo ważne jest, aby uzyskać odpowiedni rozmiar partii, aby przetwarzanie trwało od 1 do 10 milisekund. Jest to znacznie powyżej kosztów przekazania wątku i nie jest zbyt długie, aby powodować problemy z ziarnistością zadań. Pod koniec tego postu opublikowałem wyniki testów porównawczych .
Marko Topolnik
Twoje rozwiązanie dzieli się na taki, ArraySpliteratorktóry ma szacunkowy rozmiar (nawet dokładny rozmiar). Tak więc implementacja Stream zobaczy rozmiar tablicy vs Long.MAX_VALUE, weź to pod uwagę za niezrównoważone i podziel „większy” spliterator (ignorując to Long.MAX_VALUEoznacza „nieznany”), dopóki nie będzie mógł dalej podzielić. Następnie, jeśli nie ma wystarczającej liczby fragmentów, podzieli dzielniki oparte na macierzy, wykorzystując ich znane rozmiary. Tak, działa to bardzo dobrze, ale nie jest sprzeczne z moim stwierdzeniem, że potrzebujesz oszacowania rozmiaru, niezależnie od tego, jak słaba jest.
Holger
OK, więc wydaje się to nieporozumieniem --- ponieważ nie potrzebujesz oszacowania rozmiaru na wejściu. Tylko na poszczególnych podziałach i zawsze możesz to mieć.
Marko Topolnik
Cóż, mój pierwszy komentarz brzmiał: „ Potrzebujesz oszacowania rozmiaru. Może to być całkowicie nieprawdziwe, o ile z grubsza odzwierciedla stosunek twojego niezrównoważonego podziału. ” Kluczową kwestią tutaj było to, że kod OP tworzy kolejny spliterator zawierający pojedynczy element, ale wciąż zgłasza nieznany rozmiar. To sprawia, że ​​implementacja Stream jest bezradna. Zrobiłaby to dowolna liczba szacunkowa dla nowego spliteratora, która byłaby znacznie mniejsza Long.MAX_VALUE.
Holger
0

Po wielu eksperymentach nadal nie byłem w stanie uzyskać dodatkowej równoległości, grając z oszacowaniami wielkości. Zasadniczo dowolna wartość inna niż Long.MAX_VALUEpowoduje, że spliterator kończy się zbyt wcześnie (i bez podziału), podczas gdy z drugiej strony Long.MAX_VALUEoszacowanie spowoduje, że będzie wywoływany trySplitbez końca, dopóki nie powróci null.

Rozwiązaniem, które znalazłem, jest wewnętrzne dzielenie się zasobami między rozdzielaczami i umożliwienie im przywrócenia równowagi między sobą.

Kod roboczy:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R.
źródło