Różnica między redukcją a foldLeft / fold w programowaniu funkcjonalnym (szczególnie w interfejsach API Scala i Scala)?

Odpowiedzi:

260

redukcja vs foldLewy

Dużą różnicą, o której nie wspomniano w żadnej innej odpowiedzi dotyczącej przepełnienia stosu odnoszącej się wyraźnie do tego tematu, jest to, że reducenależy nadać monoid przemienny , tj. Operację, która jest zarówno przemienna, jak i asocjacyjna. Oznacza to, że operacja może być zrównoleglona.

To rozróżnienie jest bardzo ważne w przypadku Big Data / MPP / przetwarzania rozproszonego i całego powodu, dla którego w reduceogóle istnieje. Zbiór może zostać posiekany i reducemoże działać na każdym kawałku, a następnie reducemoże operować na wynikach każdego kawałka - w rzeczywistości poziom fragmentacji nie musi zatrzymywać się o jeden poziom głęboko. Moglibyśmy też posiekać każdy kawałek. Dlatego sumowanie liczb całkowitych na liście to O (log N), jeśli dana jest nieskończona liczba procesorów.

Jeśli spojrzysz tylko na podpisy, nie ma powodu reducedo istnienia, ponieważ możesz osiągnąć wszystko, co możesz, korzystając reducez pliku foldLeft. Funkcjonalność foldLeftjest większa niż funkcjonalność reduce.

Ale nie możesz zrównoleglać a foldLeft, więc jego czas wykonania jest zawsze O (N) (nawet jeśli karmisz w przemiennym monoidzie). Dzieje się tak, ponieważ zakłada się, że operacja nie jest przemiennym monoidem, a zatem skumulowana wartość zostanie obliczona przez serię sekwencyjnych agregacji.

foldLeftnie zakłada przemienności ani skojarzenia. To asocjatywność daje możliwość dzielenia kolekcji, a przemienność sprawia, że ​​kumulacja jest łatwa, ponieważ porządek nie jest ważny (więc nie ma znaczenia, w jakiej kolejności agregować każdy z wyników z każdego fragmentu). Ściśle mówiąc, przemienność nie jest konieczna do zrównoleglenia, na przykład algorytmów sortowania rozproszonego, po prostu ułatwia logikę, ponieważ nie musisz nadawać porcjom kolejności.

Jeśli spojrzysz na dokumentację Spark dotyczącą reducetego, konkretnie mówi „... przemienny i asocjacyjny operator binarny”

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Oto dowód, że reduceNIE jest to tylko specjalny przypadekfoldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

zmniejszyć vs spasować

Teraz jest trochę bliżej korzeni FP / matematycznych i trochę trudniej wyjaśnić. Reduce jest formalnie zdefiniowane jako część paradygmatu MapReduce, który zajmuje się kolekcjami bez porządku (multisets), Fold jest formalnie zdefiniowany w kategoriach rekursji (patrz katamorfizm), a tym samym przyjmuje strukturę / sekwencję dla kolekcji.

W foldScalding nie ma metody, ponieważ w (ścisłym) modelu programowania Map Reduce nie możemy zdefiniować, foldponieważ fragmenty nie mają kolejności i foldwymagają tylko asocjatywności, a nie przemienności.

Mówiąc prościej, reducedziała bez kolejności kumulacji, foldwymaga kolejności kumulacji i to właśnie ta kolejność kumulacji wymaga wartości zerowej, a NIE istnienie wartości zerowej, która je odróżnia. Ściśle mówiąc, reduce powinno działać na pustej kolekcji, ponieważ jej wartość zerową można wywnioskować, biorąc dowolną wartość, xa następnie rozwiązując x op y = x, ale to nie działa w przypadku operacji nieprzemiennej, ponieważ może istnieć odrębna wartość zerowa lewej i prawej strony (tj x op y != y op x.). Oczywiście Scala nie zadaje sobie trudu, aby dowiedzieć się, jaka jest ta wartość zerowa, ponieważ wymagałoby to zrobienia jakiejś matematyki (która prawdopodobnie jest nieobliczalna), więc po prostu zgłasza wyjątek.

Wydaje się (jak to często bywa w etymologii), że to pierwotne znaczenie matematyczne zostało utracone, ponieważ jedyną oczywistą różnicą w programowaniu jest podpis. W rezultacie reducestał się synonimem foldzamiast zachowywać jego oryginalne znaczenie z MapReduce. Teraz te terminy są często używane zamiennie i zachowują się tak samo w większości implementacji (ignorując puste kolekcje). Dziwność jest potęgowana przez osobliwości, jak w Spark, którymi teraz zajmiemy się.

Więc Spark nie mają fold, ale kolejność, w jakiej wyniki cząstkowe (po jednym dla każdej partycji) są połączone (w momencie pisania) jest taka sama kolejność, w której zadania są zakończone - a więc zakaz deterministyczny. Dzięki @CafeFeed za wskazanie foldzastosowań runJob, które po przeczytaniu kodu zdałem sobie sprawę, że jest to niedeterministyczne. Dalsze zamieszanie jest spowodowane tym, że Spark ma treeReduceale nie treeFold.

Wniosek

Istnieje różnica między, reducea foldnawet po zastosowaniu do niepustych sekwencji. Ten pierwszy jest zdefiniowany jako część paradygmatu programowania MapReduce na kolekcjach o dowolnej kolejności ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) i należy założyć, że oprócz tego, że operatory są przemienne, asocjacyjne, aby dać deterministyczne wyniki. Ta ostatnia jest zdefiniowana w kategoriach katomorfizmów i wymaga, aby zbiory miały pojęcie sekwencji (lub były definiowane rekurencyjnie, jak listy połączone), a zatem nie wymagają operatorów przemiennych.

W praktyce ze względu na nie matematyczną naturę programowania reducei foldtendencję do zachowywania się w ten sam sposób, albo poprawnie (jak w Scali), albo niepoprawnie (jak w Spark).

Dodatkowo: Moja opinia na temat interfejsu API Spark

Moim zdaniem można by uniknąć zamieszania, gdyby użycie tego terminu foldzostało całkowicie porzucone w Spark. Przynajmniej Spark ma notatkę w swojej dokumentacji:

Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala.

samthebest
źródło
2
Dlatego foldLeftzawiera Leftw nazwie i dlaczego istnieje również metoda o nazwie fold.
kiritsuku
1
@Cloudtech To zbieg okoliczności jego implementacji jednowątkowej, a nie w jej specyfikacji. Na moim 4-rdzeniowym komputerze, jeśli spróbuję dodać .par, za (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)każdym razem otrzymuję inne wyniki.
samthebest
2
@AlexDean w kontekście informatyki, nie, tak naprawdę nie potrzebuje tożsamości, ponieważ puste kolekcje mają tendencję do rzucania wyjątków. Ale matematycznie jest bardziej elegancki (i byłby bardziej elegancki, gdyby robiły to kolekcje), jeśli element tożsamości jest zwracany, gdy kolekcja jest pusta. W matematyce „rzuć wyjątek” nie istnieje.
samthebest
3
@samthebest: Czy jesteś pewien przemienności? github.com/apache/spark/blob/… mówi: „W przypadku funkcji, które nie są przemienne, wynik może różnić się od zawinięcia zastosowanego do nierozpowszechnionej kolekcji”.
Make42
1
@ Make42 Zgadza się, można napisać własny reallyFoldalfonsa choć, jak: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f), to nie musiałby f dojazdy.
samthebest
10

Jeśli się nie mylę, mimo że Spark API tego nie wymaga, fold wymaga również, aby f było przemienne. Ponieważ kolejność, w jakiej partycje będą agregowane, nie jest gwarantowana. Na przykład w poniższym kodzie sortowany jest tylko pierwszy wydruk:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Wydruk:

ABCDEFGHIJKLMNOPQRSTU VWXYZ

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

Mishael Rosenthal
źródło
Po kilku tam iz powrotem uważamy, że masz rację. Kolejność łączenia jest następująca: kto pierwszy, ten lepszy. Jeśli sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)kilka razy uruchomisz z 2+ rdzeniami, myślę, że zobaczysz, że generuje to losową kolejność (według partycji). Odpowiednio zaktualizowałem moją odpowiedź.
samthebest
3

foldw Apache Spark nie jest tym samym, co foldw kolekcjach nierozproszonych. W rzeczywistości wymaga funkcji przemiennej, aby uzyskać deterministyczne wyniki:

Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala. Tę operację zawinięcia można zastosować do przegród indywidualnie, a następnie złożyć te wyniki w wynik końcowy, zamiast stosować zawinięcie do każdego elementu sekwencyjnie w określonej kolejności. W przypadku funkcji, które nie są przemienne, wynik może różnić się od zagięcia zastosowanego do kolekcji nierozdzielonej.

To zostało pokazane przez Mishael Rosenthal i sugeruje Make42 w swoim komentarzu .

Sugerowano, że obserwowane zachowanie jest związane z tym, HashPartitionerkiedy w rzeczywistości parallelizenie tasuje i nie używa HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Wyjaśnione:

Strukturafold dla RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

jest taka sama jak strukturareduce dla RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

gdzie runJobjest wykonywany z pominięciem kolejności podziału i wymaga funkcji przemiennej.

foldPartitioni reducePartitionsą równoważne pod względem kolejności przetwarzania i skutecznie (przez dziedziczenie i delegowanie) wdrażane przez reduceLefti w foldLeftdniu TraversableOnce.

Wniosek: foldna RDD nie może zależeć od kolejności fragmentów i wymaga przemienności i asocjatywności .

user6022341
źródło
Muszę przyznać, że etymologia jest zagmatwana, a literaturze programowania brakuje formalnych definicji. Myślę, że to na pewno powiedzieć, że foldna RDDs rzeczywiście jest naprawdę tak samo jak reduce, ale ten nie przestrzega korzeniowych różnice matematyczne (zaktualizowałem moją odpowiedź będzie jeszcze bardziej jasne). Chociaż nie zgadzam się, że naprawdę potrzebujemy przemienności, pod warunkiem, że jest się pewnym, że cokolwiek robi ich partjoner, zachowuje porządek.
samthebest
Nieokreślona kolejność zawinięć nie jest związana z partycjonowaniem. Jest to bezpośrednia konsekwencja implementacji runJob.
AH! Przepraszam, nie mogłem zrozumieć, o co ci chodzi, ale po przeczytaniu runJobkodu widzę, że rzeczywiście dokonuje on łączenia zgodnie z zakończeniem zadania, a NIE kolejnością partycji. To ten kluczowy szczegół sprawia, że ​​wszystko się układa. Ponownie zredagowałem moją odpowiedź i poprawiłem w ten sposób błąd, który wskazałeś. Czy mógłbyś usunąć swoją nagrodę, skoro jesteśmy teraz w porozumieniu?
samthebest
Nie mogę edytować ani usuwać - nie ma takiej opcji. Mogę przyznać, ale myślę, że sama uwaga dostaniesz sporo punktów, czy się mylę? Jeśli potwierdzisz, że chcesz, żebym nagrodził, zrobię to w ciągu najbliższych 24 godzin. Dzięki za poprawki i przepraszam za metodę, ale wyglądało na to, że zignorowałeś wszystkie ostrzeżenia, to wielka sprawa, a odpowiedź była cytowana w każdym miejscu.
1
Co powiesz na to, że nagrodzisz to @Mishael Rosenthal, skoro był pierwszym, który jasno wyraził obawy. Nie interesują mnie punkty, po prostu lubię używać SO do SEO i organizacji.
samthebest
2

Inną różnicą w Scalding jest użycie łączników w Hadoop.

Wyobraź sobie, że twoja operacja jest przemiennym monoidem, z redukcją zostanie zastosowana po stronie mapy również zamiast tasowania / sortowania wszystkich danych do reduktorów. W przypadku foldLeft tak nie jest.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

Zawsze dobrze jest zdefiniować swoje operacje jako monoidalne w Scalding.

morazow
źródło