Jaka jest różnica między RDD map
a mapPartitions
metodą? I flatMap
zachowuje się jak map
lub jak mapPartitions
? Dzięki.
(edytuj) tj. jaka jest różnica (semantycznie lub pod względem wykonania) między
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
I:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
performance
scala
apache-spark
rdd
Nicholas White
źródło
źródło
Odpowiedzi:
Mapa metod konwertuje każdy element źródłowego RDD na pojedynczy element wynikowego RDD poprzez zastosowanie funkcji. mapPartitions konwertuje każdą partycję źródłowego RDD na wiele elementów wyniku (prawdopodobnie żaden).
Ponadto flatMap nie działa na jednym elemencie (as
map
) i tworzy wiele elementów wyniku (asmapPartitions
).źródło
map
iflatMap
mają dokładnie takie same partycje jak rodzic.map
w zasadzie pobiera twoją funkcjęf
i przekazuje ją doiter.map(f)
. Zasadniczo jest to wygodna metoda, która zawijamapPartitions
. Zdziwiłbym się, gdyby istniała przewaga wydajności w obu przypadkach dla zadania transformacji czystego stylu mapy (tj. W przypadku, gdy funkcja jest identyczna), jeśli trzeba utworzyć jakieś obiekty do przetwarzania, jeśli te obiekty mogą być współdzielone,mapPartitions
byłoby to korzystne.Chochlik. WSKAZÓWKA :
val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
Tak. proszę zobaczyć przykład 2 z
flatmap
... nie wymaga wyjaśnień.Przykładowy scenariusz : jeśli mamy 100K elementów w określonej
RDD
partycji, to uruchomimy funkcję używaną przez transformację odwzorowania 100K razy, gdy używamymap
.I odwrotnie, jeśli użyjemy,
mapPartitions
wtedy wywołamy określoną funkcję tylko raz, ale przekażemy wszystkie 100K rekordów i odzyskamy wszystkie odpowiedzi w jednym wywołaniu funkcji.Nastąpi wzrost wydajności, ponieważ
map
wiele razy działa na określonej funkcji, zwłaszcza jeśli za każdym razem funkcja robi coś kosztownego, czego nie musiałaby robić, gdybyśmy przekazali wszystkie elementy naraz (w przypadkumappartitions
).mapa
Przykład:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Przykład 1
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Przykład 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Powyższy program można również napisać przy użyciu flatMap w następujący sposób.
Przykład 2 z użyciem flatmap
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Wniosek:
mapPartitions
transformacja jest szybsza niżmap
ponieważ wywołuje twoją funkcję raz / partycję, a nie raz / element.Dalsza lektura: foreach Vs foreachPartments Kiedy używać Co?
źródło
map
lubmapPartitions
osiągnąć ten sam wynik (zobacz dwa przykłady w pytaniu); to pytanie dotyczy tego, dlaczego wybrałbyś jedną z opcji. Komentarze w drugiej odpowiedzi są naprawdę przydatne! Ponadto, nie wspominając, żemap
iflatMap
przejśćfalse
dopreservesPartitioning
, i jakie są konsekwencje, które są.map
jest lepszy niżmapPartitions
? JeślimapPartitions
jest tak dobra, dlaczego nie jest to domyślna implementacja mapy?Mapa :
MapPartitions
źródło