Czy możesz podzielić strumień na dwa strumienie?

146

Mam zestaw danych reprezentowany przez strumień Java 8:

Stream<T> stream = ...;

Widzę, jak to przefiltrować, aby uzyskać losowy podzbiór - na przykład

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

Widzę również, jak mogę zredukować ten strumień, aby uzyskać na przykład dwie listy reprezentujące dwie losowe połowy zestawu danych, a następnie przekształcić je z powrotem w strumienie. Ale czy istnieje bezpośredni sposób na wygenerowanie dwóch strumieni z pierwszego? Coś jak

(heads, tails) = stream.[some kind of split based on filter]

Dzięki za wgląd.

user1148758
źródło
Odpowiedź Marka jest o wiele pomocna niż odpowiedź Louisa, ale muszę powiedzieć, że odpowiedź Louisa jest bardziej związana z pierwotnym pytaniem. Pytanie koncentruje się raczej na możliwości konwersji Streamna wiele Streams bez konwersji pośredniej , chociaż myślę, że ludzie, którzy dotarli do tego pytania, faktycznie szukają sposobu, aby to osiągnąć, niezależnie od takiego ograniczenia, co jest odpowiedzią Marka. Może to wynikać z faktu, że pytanie w tytule nie jest takie samo jak w opisie .
devildelta

Odpowiedzi:

9

Nie dokładnie. Nie możesz wyciągnąć dwóch Streamz jednego; to nie ma sensu - jak można iterować po jednym bez konieczności generowania drugiego w tym samym czasie? Strumień można obsługiwać tylko raz.

Jeśli jednak chcesz zrzucić je na listę lub coś, możesz to zrobić

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Louis Wasserman
źródło
65
Dlaczego to nie ma sensu? Ponieważ strumień jest potokiem, nie ma powodu, dla którego nie mógł utworzyć dwóch producentów oryginalnego strumienia, widziałem, że jest to obsługiwane przez kolektor, który zapewnia dwa strumienie.
Brett Ryan,
36
Nie jest bezpieczny wątkowo. Zła rada przy próbie dodania bezpośrednio do kolekcji, dlatego mamy stream.collect(...)for ze wstępnie zdefiniowanym bezpiecznym wątkiem Collectors, który działa dobrze nawet na kolekcjach bez zabezpieczeń wątków (bez synchronicznej rywalizacji o blokady). Najlepsza odpowiedź od @MarkJeronimus.
YoYo
1
@JoD Bezpieczne dla wątków, jeśli głowy i ogony są bezpieczne dla wątków. Dodatkowo, zakładając użycie nierównoległych strumieni, tylko kolejność nie jest gwarantowana, więc są one bezpieczne wątkowo. Naprawienie problemów ze współbieżnością zależy od programisty, więc ta odpowiedź jest idealnie odpowiednia, jeśli kolekcje są bezpieczne dla wątków.
Nicolas,
1
@Nixon nie nadaje się w obecności lepszego rozwiązania, które tu mamy. Posiadanie takiego kodu może prowadzić do złego precedensu, powodując, że inni będą go używać w niewłaściwy sposób. Nawet jeśli nie są używane równoległe strumienie, dzieli go tylko jeden krok. Dobre praktyki kodowania wymagają, abyśmy nie utrzymywali stanu podczas operacji strumieniowych. Następną rzeczą, którą robimy, jest kodowanie w środowisku takim jak Apache Spark, a te same praktyki naprawdę doprowadziłyby do nieoczekiwanych wyników. To było twórcze rozwiązanie, daję je, takie, które sam mogłem napisać nie tak dawno temu.
YoYo,
1
@JoD To nie jest lepsze rozwiązanie, w rzeczywistości jest bardziej nieefektywne.Ten sposób myślenia ostatecznie kończy się wnioskiem, że wszystkie kolekcje powinny być domyślnie bezpieczne dla wątków, aby zapobiec niezamierzonym konsekwencjom, co jest po prostu błędne.
Nicolas,
301

Kolektor może być używany do tego celu.

  • W przypadku dwóch kategorii użyj opcji Collectors.partitioningBy()factory.

Spowoduje to utworzenie Mapod Booleando Listi umieszczenie elementów na jednej lub drugiej liście w oparciu o Predicate.

Uwaga: ponieważ strumień musi być zużyty w całości, nie może to działać w przypadku nieskończonych strumieni. A ponieważ strumień i tak jest zużyty, ta metoda po prostu umieszcza je na listach zamiast tworzyć nowy strumień z pamięcią. Zawsze możesz przesyłać strumieniowo te listy, jeśli potrzebujesz strumieni jako danych wyjściowych.

Nie ma też potrzeby stosowania iteratora, nawet w podanym przykładzie obejmującym tylko głowy.

  • Podział plików binarnych wygląda następująco:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • Aby uzyskać więcej kategorii, użyj Collectors.groupingBy()fabryki.
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

W przypadku, gdy strumienie nie są Stream, ale są jednym z strumieni pierwotnych, takich jak IntStream, to ta .collect(Collectors)metoda nie jest dostępna. Będziesz musiał to zrobić ręcznie bez fabryki kolektorów. Jego implementacja wygląda następująco:

[Przykład 2.0 od 16.04.2020]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

W tym przykładzie inicjalizuję ArrayLists pełnym rozmiarem początkowej kolekcji (jeśli w ogóle jest to znane). Zapobiega to zdarzeniom zmiany rozmiaru nawet w najgorszym przypadku, ale może potencjalnie pochłonąć 2 * N * T przestrzeni (N = początkowa liczba elementów, T = liczba wątków). Aby poświęcić miejsce na szybkość, możesz to pominąć lub wykorzystać swoje najlepsze przypuszczenia, takie jak oczekiwana najwyższa liczba elementów w jednej partycji (zwykle nieco ponad N / 2 dla zrównoważonego podziału).

Mam nadzieję, że nikogo nie obrażam, używając metody Java 9. W przypadku wersji Java 8 spójrz na historię edycji.

Mark Jeronimus
źródło
2
Piękny. Jednak ostatnie rozwiązanie dla IntStream nie będzie bezpieczne wątkowo w przypadku równoległego strumienia. Rozwiązanie jest dużo prostsze niż myślisz ... stream.boxed().collect(...);! Zrobi to, co reklamowano: przekonwertuj prymityw IntStreamna Stream<Integer>wersję pudełkową .
YoYo
32
Powinna to być akceptowana odpowiedź, ponieważ bezpośrednio rozwiązuje pytanie PO.
ejel
27
Chciałbym, aby przepełnienie stosu umożliwiło społeczności zastąpienie wybranej odpowiedzi, jeśli zostanie znaleziona lepsza.
GuiSim
Nie jestem pewien, czy to odpowiada na pytanie. Pytanie wymaga podzielenia strumienia na strumienie, a nie listy.
AlikElzin-kilaka
1
Funkcja akumulatora jest niepotrzebnie gadatliwa. Zamiast tego (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }możesz po prostu użyć (map, x) -> map.get(p.test(x)).add(x). Ponadto nie widzę żadnego powodu, dla którego collectoperacja nie powinna być bezpieczna dla wątków. Działa dokładnie tak, jak ma działać i bardzo blisko tego Collectors.partitioningBy(p), jak miałoby działać. Ale użyłbym IntPredicatezamiast tego, Predicate<Integer>kiedy nie używam boxed(), aby uniknąć podwójnego boksowania.
Holger
21

Natknąłem się na to pytanie i czuję, że rozwidlony strumień ma kilka przypadków użycia, które mogą okazać się słuszne. Napisałem poniższy kod jako konsument, aby nic nie robił, ale możesz go zastosować do funkcji i wszystkiego, co możesz napotkać.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Teraz Twoja implementacja kodu może wyglądać mniej więcej tak:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
Ludger
źródło
20

Niestety, to, o co prosisz, jest bezpośrednio źle widziane w JavaDoc of Stream :

Strumień powinien być obsługiwany (wywołanie pośredniej lub końcowej operacji strumienia) tylko raz. Wyklucza to na przykład strumienie „rozwidlone”, w których to samo źródło zasila dwa lub więcej rurociągów lub wielokrotne przejścia tego samego strumienia.

Możesz obejść to za pomocą peeklub innych metod, jeśli naprawdę pragniesz tego typu zachowania. W takim przypadku zamiast próbować cofnąć dwa strumienie z tego samego oryginalnego źródła strumienia za pomocą filtru rozwidlającego, należy powielić strumień i odpowiednio przefiltrować każdy z duplikatów.

Możesz jednak zechcieć ponownie rozważyć, czy struktura Streamjest odpowiednia dla twojego przypadku użycia.

Trevor Freeman
źródło
6
Sformułowanie javadoc nie wyklucza podziału na kilka strumieni, o ile pojedynczy element strumienia trafia tylko do jednego z nich
Thorbjørn Ravn Andersen
2
@ ThorbjørnRavnAndersen Nie jestem pewien, czy duplikowanie elementu strumienia jest główną przeszkodą dla rozwidlonego strumienia. Głównym problemem jest to, że operacja rozwidlenia jest zasadniczo operacją terminalową, więc decydując się na fork, w zasadzie tworzysz jakąś kolekcję. Np. Mogę napisać metodę, List<Stream> forkStream(Stream s)ale moje otrzymane strumienie będą przynajmniej częściowo wspierane przez kolekcje, a nie bezpośrednio przez strumień bazowy, w przeciwieństwie do tego, filterco nie jest operacją na strumieniu terminala.
Trevor Freeman,
7
Jest to jeden z powodów, dla których uważam, że strumienie Java są nieco słabsze w porównaniu z github.com/ReactiveX/RxJava/wiki, ponieważ celem strumienia jest zastosowanie operacji na potencjalnie nieskończonym zestawie elementów, a operacje w świecie rzeczywistym często wymagają dzielenia powielanie i łączenie strumieni.
Usman Ismail
8

Jest to sprzeczne z ogólnym mechanizmem Stream. Powiedzmy, że możesz podzielić strumień S0 na Sa i Sb tak, jak chcesz. Wykonanie dowolnej operacji terminalowej, powiedzmy count(), na Sa z konieczności „zużyje” wszystkie elementy w S0. Dlatego Sb stracił źródło danych.

Wydaje mi się, że wcześniej Stream miał tee()metodę, która kopiowała strumień do dwóch. Jest teraz usunięty.

Stream ma jednak metodę peek (), możesz jej użyć do spełnienia swoich wymagań.

ZhongYu
źródło
1
peekjest dokładnie tym, czym było kiedyś tee.
Louis Wasserman,
5

niezupełnie, ale możesz być w stanie osiągnąć to, czego potrzebujesz, przywołując Collectors.groupingBy(). tworzysz nową kolekcję, a następnie możesz utworzyć instancje strumieni w tej nowej kolekcji.

aepurniet
źródło
2

To była najmniej zła odpowiedź, jaką mogłem wymyślić.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

To pobiera strumień liczb całkowitych i dzieli je na 5. Dla tych większych niż 5 filtruje tylko liczby parzyste i umieszcza je na liście. Reszta łączy je z |.

wyjścia:

 ([6, 8],0|1|2|3|4|5)

Nie jest idealny, ponieważ gromadzi wszystko w kolekcjach pośrednich, przerywając strumień (i ma zbyt wiele argumentów!)

Ian Jones
źródło
1

Natknąłem się na to pytanie, szukając sposobu na odfiltrowanie pewnych elementów ze strumienia i zarejestrowanie ich jako błędów. Więc tak naprawdę nie musiałem tak bardzo dzielić strumienia, ile dołączyć przedwczesną akcję kończącą do predykatu z dyskretną składnią. Oto, co wymyśliłem:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
Sebastian Hans
źródło
0

Krótsza wersja wykorzystująca Lombok

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}
OneCricketeer
źródło
-3

Co powiesz na:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
Mateusz
źródło
1
Ponieważ dostawca jest wywoływany dwukrotnie, otrzymasz dwie różne losowe kolekcje. Myślę, że to OP ma zamiar podzielić kursy od
zdarzeń parzystych