Skopiuj strumień, aby uniknąć stwierdzenia, że ​​„strumień był już obsługiwany lub zamknięty”

121

Chciałbym zduplikować strumień Java 8, abym mógł sobie z tym poradzić dwa razy. Mogę collectjako listę i uzyskać z tego nowe strumienie;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

Ale wydaje mi się, że powinien być bardziej wydajny / elegancki sposób.

Czy istnieje sposób na skopiowanie strumienia bez przekształcania go w zbiór?

W rzeczywistości pracuję ze strumieniem Eithers, więc chcę przetworzyć lewą projekcję w jeden sposób, zanim przejdę do prawej projekcji i zajmiemy się tym w inny sposób. Coś w tym rodzaju (z którym jak dotąd jestem zmuszony używać toListsztuczki).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
Toby
źródło
Czy mógłbyś bardziej szczegółowo omówić „przetwarzanie w jedną stronę”… czy konsumujesz obiekty? Mapujesz je? PartitionBy () i groupingBy () mogą doprowadzić Cię bezpośrednio do ponad 2 list, ale możesz skorzystać z mapowania najpierw lub po prostu mając rozwidlenie decyzyjne w forEach ().
AjahnCharles
W niektórych przypadkach przekształcenie go w kolekcję nie może być opcją, jeśli mamy do czynienia z nieskończonym strumieniem. Alternatywę dla zapamiętywania możesz znaleźć tutaj: dzone.com/articles/how-to-replay-java-streams
Miguel Gamboa

Odpowiedzi:

88

Myślę, że twoje założenia dotyczące wydajności są wsteczne. Ten ogromny zwrot z tytułu wydajności uzyskasz, jeśli zamierzasz wykorzystać dane tylko raz, ponieważ nie musisz ich przechowywać, a strumienie zapewniają potężne optymalizacje „fuzji pętli”, które pozwalają efektywnie przepływać całe dane przez potok.

Jeśli chcesz ponownie wykorzystać te same dane, z definicji musisz albo wygenerować je dwukrotnie (deterministycznie), albo zapisać. Jeśli zdarzy się, że znajduje się już w kolekcji, świetnie; następnie powtórzenie go dwukrotnie jest tanie.

Eksperymentowaliśmy w projekcie z „rozwidlonymi strumieniami”. Odkryliśmy, że wspieranie tego pociąga za sobą rzeczywiste koszty; obciążało to zwykły przypadek (użyj raz) kosztem rzadkiego przypadku. Dużym problemem było rozwiązanie tego, „co się dzieje, gdy dwa potoki nie zużywają danych w tym samym tempie”. Teraz i tak wrócisz do buforowania. To była cecha, która najwyraźniej nie miała swojej wagi.

Jeśli chcesz wielokrotnie operować na tych samych danych, zapisz je lub zorganizuj swoje operacje jako konsumenci i wykonaj następujące czynności:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

Możesz również zajrzeć do biblioteki RxJava, ponieważ jej model przetwarzania lepiej nadaje się do tego rodzaju „rozwidlania strumienia”.

Brian Goetz
źródło
1
Być może nie powinienem był używać „wydajności”, zastanawiam się, dlaczego miałbym zawracać sobie głowę strumieniami (i nie przechowywać niczego), jeśli wszystko, co robię, to natychmiastowe przechowywanie danych ( toList), aby móc je przetworzyć ( Eitherprzypadek jako przykład)?
Toby
11
Strumienie są wyraziste i wydajne . Są wyraziste, ponieważ umożliwiają konfigurowanie złożonych operacji agregujących bez wielu przypadkowych szczegółów (np. Wyników pośrednich) w sposobie czytania kodu. Są również wydajne, ponieważ (na ogół) wykonują pojedynczy przebieg danych i nie zapełniają kontenerów wyników pośrednich. Te dwie właściwości sprawiają, że są one atrakcyjnym modelem programowania w wielu sytuacjach. Oczywiście nie wszystkie modele programowania pasują do wszystkich problemów; nadal musisz zdecydować, czy używasz odpowiedniego narzędzia do pracy.
Brian Goetz
1
Jednak brak możliwości ponownego wykorzystania strumienia powoduje sytuacje, w których programista jest zmuszony przechowywać wyniki pośrednie (gromadzenie) w celu przetworzenia strumienia na dwa różne sposoby. Implikacja, że ​​strumień jest generowany więcej niż raz (chyba że go zbierzesz) wydaje się jasna - ponieważ w przeciwnym razie nie potrzebowałbyś metody zbierania.
Niall Connaughton
@NiallConnaughton Nie jestem pewien, czy chcesz, o co chodzi. Jeśli chcesz dwukrotnie ją przemierzyć, ktoś musi ją przechować lub regenerować. Czy sugerujesz, że biblioteka powinna ją buforować na wypadek, gdyby ktoś potrzebował jej dwa razy? To byłoby głupie.
Brian Goetz
Nie sugerując, że biblioteka powinna go buforować, ale mówiąc, że mając strumienie jako jednorazowe, zmusza ludzi, którzy chcą ponownie użyć strumienia początkowego (tj: współdzielić logikę deklaratywną używaną do jej zdefiniowania) do zbudowania wielu strumieni pochodnych w celu zbierania strumień nasion lub mieć dostęp do fabryki dostawców, która utworzy duplikat strumienia początkowego. Obie opcje mają swoje bolączki. Ta odpowiedź zawiera znacznie więcej szczegółów na ten temat: stackoverflow.com/a/28513908/114200 .
Niall Connaughton
73

Możesz użyć zmiennej lokalnej z a, Supplieraby skonfigurować wspólne części potoku strumienia.

Z http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

Ponowne wykorzystanie strumieni

Nie można ponownie wykorzystać strumieni Java 8. Jak tylko zadzwonisz do dowolnej operacji na terminalu, strumień jest zamknięty:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at 
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

Aby przezwyciężyć to ograniczenie, musimy utworzyć nowy łańcuch strumieni dla każdej operacji terminalowej, którą chcemy wykonać, np. Moglibyśmy stworzyć dostawcę strumienia, aby skonstruować nowy strumień ze wszystkimi już skonfigurowanymi operacjami pośrednimi:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Każde wywołanie to get()tworzy nowy strumień, na którym zapisujemy wywołanie żądanej operacji terminalowej.

user4975679
źródło
2
ładne i eleganckie rozwiązanie. znacznie bardziej java8-ish niż najbardziej popularne rozwiązanie.
dylaniato
Tylko uwaga na temat używania, Supplierjeśli Streamjest zbudowany w sposób „kosztowny”, płacisz ten koszt za każde połączenie doSupplier.get() . tj. jeśli zapytanie do bazy danych ... to zapytanie jest wykonywane za każdym razem
Julien
Nie możesz podążać za tym wzorcem po mapTo, chociaż używasz IntStream. Okazało się, że muszę przekonwertować go z powrotem na Set<Integer>using collect(Collectors.toSet())... i wykonać na tym kilka operacji. Chciałem max()i gdyby konkretna wartość była ustawiona jako dwie operacje ...filter(d -> d == -1).count() == 1;
JGFMK
16

Użyj a, Supplieraby utworzyć strumień dla każdej operacji kończenia.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

Zawsze, gdy potrzebujesz strumienia z tej kolekcji, użyj, streamSupplier.get()aby uzyskać nowy strumień.

Przykłady:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);
Barany
źródło
Głosujcie za was, ponieważ jako pierwsi wskazaliście tu Dostawców.
EnzoBnl
9

Wdrożyliśmy duplicate()metodę dla strumieni w jOOλ , bibliotece Open Source, którą stworzyliśmy, aby usprawnić testowanie integracji dla jOOQ . Zasadniczo możesz po prostu napisać:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Wewnętrznie istnieje bufor przechowujący wszystkie wartości, które zostały zużyte z jednego strumienia, ale nie z drugiego. Jest to prawdopodobnie tak wydajne, jak to tylko możliwe, jeśli twoje dwa strumienie są zużywane mniej więcej w tym samym tempie i jeśli możesz żyć z brakiem bezpieczeństwa wątków .

Oto jak działa algorytm:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

Więcej kodu źródłowego tutaj

Tuple2jest prawdopodobnie podobny do Twojego Pairtypu, ale Seqma Streampewne ulepszenia.

Lukas Eder
źródło
2
To rozwiązanie nie jest bezpieczne dla wątków: nie można przekazać jednego strumienia do innego wątku. Naprawdę nie widzę scenariusza, w którym oba strumienie mogą być zużywane z równą szybkością w jednym wątku, a faktycznie potrzebujesz dwóch różnych strumieni. Jeśli chcesz uzyskać dwa wyniki z tego samego strumienia, znacznie lepiej byłoby użyć łączonych kolektorów (które już masz w JOOL).
Tagir Valeev
@TagirValeev: Masz rację co do bezpieczeństwa wątków, słuszna uwaga. Jak można to zrobić, łącząc kolekcjonerów?
Lukas Eder
1
To znaczy, jeśli ktoś chce użyć tego samego strumienia dwa razy w ten sposób Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());, lepiej to zrobić Tuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));. Użycie Collectors.mapping/reducingjednego może wyrazić inne operacje strumieniowe jako kolektory i przetwarzać elementy w zupełnie inny sposób, tworząc pojedynczą wynikową krotkę. Ogólnie rzecz biorąc, możesz zrobić wiele rzeczy, zużywając strumień raz bez powielania i będzie to przyjazne dla równoległych.
Tagir Valeev
2
W takim przypadku nadal będziesz redukować jeden strumień po drugim. Nie ma więc sensu utrudniać życia, wprowadzając wyrafinowany iterator, który i tak zbierze cały strumień do listy pod maską. Możesz po prostu zebrać do listy jawnie, a następnie utworzyć z niej dwa strumienie, jak mówi OP (to ta sama liczba linii kodu). Cóż, możesz mieć pewną poprawę tylko wtedy, gdy pierwsza redukcja to zwarcie, ale tak nie jest w przypadku OP.
Tagir Valeev
1
@maaartinus: Dzięki, dobry wskaźnik. Utworzyłem problem dotyczący testu porównawczego. Użyłem go do offer()/ poll()API, ale ArrayDequemoże zrobić to samo.
Lukas Eder
7

Możesz utworzyć strumień elementów wykonawczych (na przykład):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Gdzie failurei successjakie operacje należy zastosować. Spowoduje to jednak utworzenie wielu obiektów tymczasowych i może nie być bardziej wydajne niż rozpoczęcie od kolekcji i dwukrotne przesyłanie strumieniowe / iterowanie.

asylias
źródło
4

Innym sposobem wielokrotnego obsługiwania elementów jest użycie Stream.peek (Consumer) :

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) można łączyć łańcuchami tyle razy, ile potrzeba.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
Jaskółka oknówka
źródło
Wygląda na to, że peek nie powinien być do tego używany (patrz softwareengineering.stackexchange.com/a/308979/195787 )
HectorJ
2
@HectorJ Drugi wątek dotyczy modyfikacji elementów. Założyłem, że tak się nie dzieje.
Martin
2

cyclops-react , biblioteka, do której się przyczyniam, ma statyczną metodę, która pozwoli ci zduplikować strumień (i zwróci krotkę strumieni jOOλ).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

Zobacz komentarze, istnieje spadek wydajności, który zostanie naliczony podczas używania duplikatu w istniejącym strumieniu. Bardziej wydajną alternatywą byłoby użycie Streamable: -

Istnieje również (leniwa) klasa Streamable, którą można skonstruować na podstawie Stream, Iterable lub Array i wielokrotnie odtwarzać.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream (stream) - może służyć do tworzenia Streamable, który będzie leniwie wypełniał swoją kolekcję zapasową w sposób, który może być współużytkowany przez wątki. Streamable.fromStream (stream) nie spowoduje żadnego obciążenia związanego z synchronizacją.

John McClean
źródło
2
I oczywiście należy zauważyć, że otrzymane strumienie mają znaczne obciążenie procesora / pamięci i bardzo słabą wydajność równoległą. Również to rozwiązanie nie jest bezpieczne dla wątków (nie można przekazać jednego z wynikowych strumieni do innego wątku i bezpiecznie przetwarzać go równolegle). Byłoby znacznie wydajniejsze i bezpieczniejsze List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())(jak sugeruje OP). Ponadto, proszę wyraźnie ujawnić w odpowiedzi, że jesteś autorem cyklop-strumieni. Przeczytaj to .
Tagir Valeev
Zaktualizowano, aby odzwierciedlić, że jestem autorem. Warto również omówić charakterystykę wykonania każdego z nich. Twoja powyższa ocena jest bardzo trafna w przypadku StreamUtils.duplicate. StreamUtils.duplicate działa poprzez buforowanie danych z jednego strumienia do drugiego, powodując obciążenie procesora i pamięci (w zależności od przypadku użycia). Jednak w przypadku Streamable.of (1, 2, 3) nowy strumień jest tworzony bezpośrednio z tablicy za każdym razem, a charakterystyka wydajności, w tym wydajność równoległa, będzie taka sama, jak w przypadku normalnie tworzonego strumienia.
John McClean,
Istnieje również klasa AsStreamable, która umożliwia utworzenie wystąpienia Streamable z Stream, ale synchronizuje dostęp do kolekcji stanowiącej kopię zapasową Streamable w trakcie jego tworzenia (AsStreamable.synchronizedFromStream). Uczynienie go bardziej odpowiednim do użytku w różnych wątkach (jeśli tego potrzebujesz - wyobrażam sobie 99% czasu, w którym strumienie są tworzone i ponownie wykorzystywane w tym samym wątku).
John McClean,
Cześć Tagir - czy nie powinieneś również ujawniać w swoim komentarzu, że jesteś autorem konkurencyjnej biblioteki?
John McClean,
1
Komentarze nie są odpowiedziami i nie reklamuję tutaj mojej biblioteki, ponieważ moja biblioteka nie ma funkcji kopiowania strumienia (tylko dlatego, że uważam, że jest bezużyteczny), więc nie konkurujemy tutaj. Oczywiście, gdy proponuję rozwiązanie dotyczące mojej biblioteki, zawsze wyraźnie mówię, że jestem autorem.
Tagir Valeev
0

W przypadku tego konkretnego problemu możesz również użyć partycjonowania. Coś jak

     // Partition Eighters into left and right
     List<Either<Pair<A, Throwable>, A>> results = doSomething();
     Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
     passingFailing.get(true) <- here will be all passing (left values)
     passingFailing.get(false) <- here will be all failing (right values)
Lubomir Varga
źródło
0

Możemy skorzystać z Stream Builder podczas czytania lub iteracji strumienia. Oto dokument programu Stream Builder .

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html

Przypadek użycia

Powiedzmy, że mamy strumień pracowników i musimy użyć tego strumienia, aby zapisać dane pracowników w pliku Excel, a następnie zaktualizować kolekcję / tabelę pracowników [To tylko przypadek użycia, aby pokazać użycie Stream Builder]:

Stream.Builder<Employee> builder = Stream.builder();

employee.forEach( emp -> {
   //store employee data to excel file 
   // and use the same object to build the stream.
   builder.add(emp);
});

//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();
Lokesh Singal
źródło
0

Miałem podobny problem i mogłem wymyślić trzy różne struktury pośrednie, z których można utworzyć kopię strumienia: a List, tablicę i Stream.Builder. Napisałem mały program porównawczy, który sugerował, że z punktu widzenia wydajności Listbył o około 30% wolniejszy niż pozostałe dwa, które były dość podobne.

Jedyną wadą konwersji na tablicę jest to, że jest to trudne, jeśli typ elementu jest typem ogólnym (który w moim przypadku był); dlatego wolę używać plikuStream.Builder .

Skończyło się na napisaniu małej funkcji, która tworzy Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

Mogę wtedy zrobić kopię dowolnego strumienia str, robiąc to, str.collect(copyCollector())co wydaje się być zgodne z idiomatycznym użyciem strumieni.

Jeremy Hicks
źródło