Chcę użyć Stream
do równoległego przetwarzania heterogenicznego zestawu zdalnie przechowywanych plików JSON o nieznanej liczbie (liczba plików nie jest znana z góry). Rozmiar plików może się znacznie różnić, od 1 rekordu JSON na plik do 100 000 rekordów w niektórych innych plikach. Rekord JSON w tym przypadku oznacza samodzielne JSON obiekt reprezentowany jako jeden wiersz w pliku.
Naprawdę chcę do tego używać strumieni, więc wdrożyłem to Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Problem, który mam, polega na tym, że podczas gdy strumień pięknie zrównuje się na początku, w końcu największy plik jest przetwarzany w jednym wątku. Uważam, że bliższa przyczyna jest dobrze udokumentowana: spliterator jest „niezrównoważony”.
Mówiąc bardziej konkretnie, wydaje się, że trySplit
metoda nie jest wywoływana po pewnym momencie Stream.forEach
cyklu życia, więc dodatkowa logika do dystrybucji małych partii na końcu trySplit
jest rzadko wykonywana.
Zauważ, że wszystkie spliteratory zwrócone z trySplit współużytkują ten sam paths
iterator. Pomyślałem, że to naprawdę sprytny sposób na zrównoważenie pracy we wszystkich rozdzielaczach, ale to nie wystarczyło do osiągnięcia pełnej równoległości.
Chciałbym, aby równoległe przetwarzanie przebiegało najpierw między plikami, a następnie, gdy niewiele dużych plików pozostało rozdzielających się, chcę przeprowadzić równoległość między fragmentami pozostałych plików. Taki był cel else
bloku pod koniec trySplit
.
Czy istnieje prosty / prosty / kanoniczny sposób rozwiązania tego problemu?
źródło
Long.MAX_VALUE
że powrót powoduje nadmierne i niepotrzebne dzielenie, podczas gdy wszelkie szacunki inne niżLong.MAX_VALUE
powodują zatrzymanie dalszego podziału, zabijając równoległość. Zwrócenie zestawu dokładnych szacunków nie prowadzi do żadnych inteligentnych optymalizacji.AbstractSpliterator
ale zastępujesz,trySplit()
co jest złym zestawem do niczego innegoLong.MAX_VALUE
, ponieważ nie dostosowujesz oszacowania rozmiarutrySplit()
. NastępnietrySplit()
oszacowanie rozmiaru powinno zostać zmniejszone o liczbę elementów, które zostały rozdzielone.Odpowiedzi:
Twój
trySplit
powinien dzieli wyjściowe równej wielkości, niezależnie od wielkości plików bazowych. Powinieneś traktować wszystkie pliki jako pojedynczą jednostkę i zaArrayList
każdym razem wypełniać splicator z powrotem z tą samą liczbą obiektów JSON. Liczba obiektów powinna być taka, aby przetworzenie jednego podziału trwało od 1 do 10 milisekund: mniej niż 1 ms i zaczynasz zbliżać się do kosztów przekazania partii do wątku roboczego, wyższego i zaczynasz ryzykować nierównomierne obciążenie procesora z powodu zadania, które są zbyt gruboziarniste.Spliterator nie jest zobowiązany do zgłaszania oszacowania rozmiaru, a już robisz to poprawnie: twoje oszacowanie jest
Long.MAX_VALUE
, co jest specjalną wartością oznaczającą „bez ograniczeń”. Jeśli jednak masz wiele plików z jednym obiektem JSON, co skutkuje partiami o rozmiarze 1, pogorszy to twoją wydajność na dwa sposoby: obciążenie związane z otwieraniem, czytaniem i zamykaniem pliku może stać się wąskim gardłem, a jeśli uda ci się uciec że koszt przekazania wątku może być znaczny w porównaniu z kosztem przetworzenia jednego elementu, ponownie powodując wąskie gardło.Pięć lat temu rozwiązałem podobny problem, możesz rzucić okiem na moje rozwiązanie .
źródło
Long.MAX_VALUE
poprawnie opisujesz nieznany rozmiar, ale to nie pomaga, gdy faktyczna implementacja Stream działa słabo. Nawet użycie wynikuThreadLocalRandom.current().nextInt(100, 100_000)
jako oszacowanego rozmiaru daje lepsze wyniki.ArraySpliterator
który ma szacunkowy rozmiar (nawet dokładny rozmiar). Tak więc implementacja Stream zobaczy rozmiar tablicy vsLong.MAX_VALUE
, weź to pod uwagę za niezrównoważone i podziel „większy” spliterator (ignorując toLong.MAX_VALUE
oznacza „nieznany”), dopóki nie będzie mógł dalej podzielić. Następnie, jeśli nie ma wystarczającej liczby fragmentów, podzieli dzielniki oparte na macierzy, wykorzystując ich znane rozmiary. Tak, działa to bardzo dobrze, ale nie jest sprzeczne z moim stwierdzeniem, że potrzebujesz oszacowania rozmiaru, niezależnie od tego, jak słaba jest.Long.MAX_VALUE
.Po wielu eksperymentach nadal nie byłem w stanie uzyskać dodatkowej równoległości, grając z oszacowaniami wielkości. Zasadniczo dowolna wartość inna niż
Long.MAX_VALUE
powoduje, że spliterator kończy się zbyt wcześnie (i bez podziału), podczas gdy z drugiej stronyLong.MAX_VALUE
oszacowanie spowoduje, że będzie wywoływanytrySplit
bez końca, dopóki nie powrócinull
.Rozwiązaniem, które znalazłem, jest wewnętrzne dzielenie się zasobami między rozdzielaczami i umożliwienie im przywrócenia równowagi między sobą.
Kod roboczy:
źródło