Mam duży plik zawierający listę pozycji.
Chciałbym utworzyć partię pozycji, wykonać żądanie HTTP z tą partią (wszystkie pozycje są potrzebne jako parametry w żądaniu HTTP). Mogę to zrobić bardzo łatwo za pomocą for
pętli, ale jako miłośnik Java 8 chcę spróbować napisać to za pomocą frameworka Stream Java 8 (i czerpać korzyści z leniwego przetwarzania).
Przykład:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Chcę zrobić coś długiego
lazyFileStream.group(500).map(processBatch).collect(toList())
Jaki byłby najlepszy sposób na zrobienie tego?
java
java-8
batch-processing
java-stream
Andy Dang
źródło
źródło
flatMap
(+ dodatkowej płaskiej mapy, aby ponownie zwinąć strumienie)? Nie sądzę, aby coś takiego istniało jako wygodna metoda w standardowej bibliotece. Albo będziesz musiał znaleźć bibliotekę innej firmy, albo napisać własną w oparciu o spliteratory i / lub kolekcjoner emitujący strumień strumieniStream.generate
zreader::readLine
ilimit
, ale problem polega na tym, że strumienie nie działają dobrze z wyjątkami. Prawdopodobnie nie jest to również możliwe do zrównoleglenia. Myślę, żefor
pętla jest nadal najlepszą opcją.Odpowiedzi:
Uwaga! To rozwiązanie odczytuje cały plik przed uruchomieniem forEach.
Możesz to zrobić za pomocą jOOλ , biblioteki, która rozszerza strumienie Java 8 dla przypadków użycia jednowątkowych, sekwencyjnych strumieni:
Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); });
Za kulisami
zipWithIndex()
jest po prostu:static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); }
... mając na uwadze
groupBy()
wygodę API dla:default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); }
(Zastrzeżenie: pracuję dla firmy stojącej za jOOλ)
źródło
Map
(w przeciwieństwie na przykład do rozwiązania Ben Manesa)Aby uzyskać kompletność, oto rozwiązanie Guava .
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
W pytaniu kolekcja jest dostępna, więc strumień nie jest potrzebny i można go zapisać jako,
Iterables.partition(data, batchSize).forEach(this::process);
źródło
Lists.partition
to kolejna odmiana, o której powinienem był wspomnieć.Stream
do pamięci przed przetworzeniem odpowiedniej partiibatchSize
elementy na iterację.Możliwa jest również implementacja czystej Java-8:
int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch));
Zauważ, że w przeciwieństwie do JOOl może dobrze działać równolegle (pod warunkiem, że
data
jest to lista o swobodnym dostępie).źródło
List
(patrzdata.size()
,data.get()
w pytaniu). Odpowiadam na zadane pytanie. Jeśli masz inne pytanie, zadaj je zamiast tego (chociaż myślę, że pytanie dotyczące strumienia również zostało już zadane).Rozwiązanie Pure Java 8 :
Możemy stworzyć niestandardowy kolektor, aby zrobić to elegancko, który zajmuje a
batch size
i a,Consumer
aby przetworzyć każdą partię:import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } }
Opcjonalnie utwórz pomocniczą klasę narzędziową:
import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } }
Przykładowe użycie:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor));
Opublikowałem również swój kod na GitHub, jeśli ktoś chce rzucić okiem:
Link do Github
źródło
Napisałem niestandardowy Spliterator dla takich scenariuszy. Wypełni listy o danym rozmiarze ze strumienia wejściowego. Zaletą tego podejścia jest to, że będzie wykonywać leniwe przetwarzanie i będzie działać z innymi funkcjami strumienia.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } }
źródło
SUBSIZED
podziałem zwracanym z,trySplit
może mieć więcej elementów niż przed podziałem (jeśli podział następuje w środku partii).Spliterators
jest poprawne, to czytrySplit
należy zawsze podzielić dane na dwie mniej więcej równe części, aby wynik nigdy nie był większy niż oryginał?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Mieliśmy podobny problem do rozwiązania. Chcieliśmy wziąć strumień, który był większy niż pamięć systemowa (iterując po wszystkich obiektach w bazie danych) i jak najlepiej losowo uporządkować kolejność - pomyśleliśmy, że byłoby dobrze zbuforować 10000 elementów i je losować.
Celem była funkcja, która przyjmowała strumień.
Spośród proponowanych tutaj rozwiązań wydaje się, że istnieje szereg opcji:
Początkowo naszym instynktem było użycie niestandardowego kolektora, ale oznaczało to rezygnację z przesyłania strumieniowego. Powyższe niestandardowe rozwiązanie kolektora jest bardzo dobre i prawie go użyliśmy.
Oto rozwiązanie, które oszukuje, wykorzystując fakt, że
Stream
s może dać ci,Iterator
którego możesz użyć jako włazu ewakuacyjnego, abyś mógł zrobić coś więcej, czego strumienie nie obsługują.Iterator
Jest przekształcany z powrotem do strumienia za pomocą innej, Java 8StreamSupport
czary./** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } }
Prosty przykład użycia tego wyglądałby następująco:
@Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); }
Powyższe wydruki
W naszym przypadku chcieliśmy przetasować partie, a następnie zachować je jako strumień - wyglądało to tak:
@Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); }
Wyprowadza coś takiego (jest losowy, więc za każdym razem inny)
Sekretem jest to, że zawsze istnieje strumień, więc możesz albo działać na strumieniu partii, albo zrobić coś z każdą partią, a następnie z
flatMap
powrotem do strumienia. Jeszcze lepiej, wszystkie powyższe tylko działa jako ostateczneforEach
lubcollect
czy inne wyrazy kończące PULL dane przez strumień.Okazuje się, że
iterator
jest to szczególny rodzaj operacji kończącej na strumieniu i nie powoduje on uruchomienia całego strumienia i zapamiętania go! Podziękowania dla facetów z Java 8 za genialny projekt!źródło
List
nie możesz odroczyć iteracji elementów wewnątrz partii, ponieważ konsument może chcieć pominąć całą partię, a jeśli nie zużyjesz elementy to nie przeskakują zbyt daleko. (Zaimplementowałem jeden z nich w C #, chociaż było to znacznie łatwiejsze.)Możesz także użyć RxJava :
lub
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
lub
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
źródło
Możesz też rzucić okiem na cyklop-reaguj , jestem autorem tej biblioteki. Implementuje interfejs jOOλ (i przez rozszerzenie JDK 8 Streams), ale w przeciwieństwie do JDK 8 Parallel Streams skupia się na operacjach asynchronicznych (takich jak potencjalne blokowanie asynchronicznych wywołań we / wy). JDK Parallel Streams, z kolei skupia się na równoległości danych dla operacji związanych z procesorem. Działa poprzez zarządzanie agregatami zadań opartych na Future pod maską, ale przedstawia standardowe rozszerzone API dla użytkowników końcowych.
Ten przykładowy kod może pomóc w rozpoczęciu
LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
W tym miejscu znajduje się samouczek dotyczący grupowania
I bardziej ogólny samouczek tutaj
Aby użyć własnej puli wątków (która prawdopodobnie jest bardziej odpowiednia do blokowania we / wy), możesz rozpocząć przetwarzanie za pomocą
LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
źródło
Przykład w czystej Javie 8, który działa również z równoległymi strumieniami.
Jak używać:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
Deklaracja i implementacja metody:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor) { List<ElementType> newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List<ElementType> fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }
źródło
Z całą uczciwością spójrz na eleganckie rozwiązanie Vavr :
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
źródło
Prosty przykład z użyciem Spliteratora
// read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } }
Odpowiedź Bruce'a jest bardziej wyczerpująca, ale szukałem czegoś szybkiego i brudnego do przetworzenia wielu plików.
źródło
jest to czysta java rozwiązanie, które jest oceniane leniwie.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){ List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable currentBatch.add(new ArrayList<T>(batchSize)); return Stream.concat(stream .sequential() .map(new Function<T, List<T>>(){ public List<T> apply(T t){ currentBatch.get(0).add(t); return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null; } }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0)) .limit(1) ).filter(Objects::nonNull); }
źródło
Możesz użyć apache.commons:
ListUtils.partition(ListOfLines, 500).stream() .map(partition -> processBatch(partition) .collect(Collectors.toList());
Część partycjonowania jest wykonywana bez lenistwa, ale po podzieleniu listy na partycje uzyskujesz korzyści z pracy ze strumieniami (np. Użyj strumieni równoległych, dodaj filtry itp.). Inne odpowiedzi sugerowały bardziej rozbudowane rozwiązania, ale czasami czytelność i łatwość konserwacji są ważniejsze (a czasami nie są :-))
źródło
Można to łatwo zrobić za pomocą Reaktora :
źródło
Za pomocą
Java 8
icom.google.common.collect.Lists
możesz zrobić coś takiego:public class BatchProcessingUtil { public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) { List<List<T>> batches = Lists.partition(data, batchSize); return batches.stream() .map(processFunction) // Send each batch to the process function .flatMap(Collection::stream) // flat results to gather them in 1 stream .collect(Collectors.toList()); } }
Tutaj
T
jest typ pozycji na liście wejściowej iU
typ pozycji na liście wyjściowejMożesz go używać w ten sposób:
List<String> userKeys = [... list of user keys] List<Users> users = BatchProcessingUtil.process( userKeys, 10, // Batch Size partialKeys -> service.getUsers(partialKeys) );
źródło