Apache Spark: map vs mapPartitions?

140

Jaka jest różnica między RDD map a mapPartitionsmetodą? I flatMapzachowuje się jak maplub jak mapPartitions? Dzięki.

(edytuj) tj. jaka jest różnica (semantycznie lub pod względem wykonania) między

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

I:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicholas White
źródło
5
Po przeczytaniu poniższej odpowiedzi możesz rzucić okiem na [to doświadczenie] udostępnione przez kogoś, kto faktycznie z niego korzystał. ( Bzhangusc.wordpress.com/2014/06/19/ ... ) bzhangusc.wordpress.com/2014/06/19 /…
Abhidemon

Odpowiedzi:

126

Jaka jest różnica między mapą RDD a metodą mapPartitions?

Mapa metod konwertuje każdy element źródłowego RDD na pojedynczy element wynikowego RDD poprzez zastosowanie funkcji. mapPartitions konwertuje każdą partycję źródłowego RDD na wiele elementów wyniku (prawdopodobnie żaden).

I czy flatMap zachowuje się jak map lub jak mapPartitions?

Ponadto flatMap nie działa na jednym elemencie (as map) i tworzy wiele elementów wyniku (as mapPartitions).

Alexey Romanov
źródło
3
Dzięki - czy więc mapa powoduje tasowanie (lub w inny sposób zmienia liczbę partycji)? Czy przenosi dane między węzłami? Używałem mapPartitions, aby uniknąć przenoszenia danych między węzłami, ale nie byłem pewien, czy zrobi to flapMap.
Nicholas White,
Jeśli spojrzysz na źródło - github.com/apache/incubator-spark/blob/ ... i github.com/apache/incubator-spark/blob/ ... - oba mapi flatMapmają dokładnie takie same partycje jak rodzic.
Alexey Romanov
13
Uwaga: prezentacja wygłoszona przez prelegenta na San Francisco Spark Summit w 2013 r. (Goo.gl/JZXDCR) podkreśla, że ​​zadania z wysokim narzutem na rekord działają lepiej z mapPartition niż z transformacją mapy. Zgodnie z prezentacją wynika to z wysokich kosztów założenia nowego zadania.
Mikel Urkia
1
Widzę coś przeciwnego - nawet przy bardzo małych operacjach szybsze jest wywołanie mapPartitions i iteracja niż wywołanie mapy. Zakładam, że to tylko narzut związany z uruchomieniem silnika językowego, który przetworzy zadanie mapy. (Jestem w R, co może mieć większe narzuty związane z uruchamianiem). Jeśli wykonywałbyś wiele operacji, to mapPartitions wydaje się być trochę szybsze - zakładam, że dzieje się tak, ponieważ czyta RDD tylko raz. Nawet jeśli RDD jest buforowany w pamięci RAM, oszczędza to wiele narzutów związanych z konwersją typów.
Bob
3
mapw zasadzie pobiera twoją funkcję fi przekazuje ją do iter.map(f). Zasadniczo jest to wygodna metoda, która zawija mapPartitions. Zdziwiłbym się, gdyby istniała przewaga wydajności w obu przypadkach dla zadania transformacji czystego stylu mapy (tj. W przypadku, gdy funkcja jest identyczna), jeśli trzeba utworzyć jakieś obiekty do przetwarzania, jeśli te obiekty mogą być współdzielone, mapPartitionsbyłoby to korzystne.
NightWolf
141

Chochlik. WSKAZÓWKA :

Ilekroć masz inicjalizację ciężką, którą należy wykonać raz dla wielu RDDelementów, a nie raz na RDDelement, i jeśli tej inicjalizacji, takiej jak tworzenie obiektów z biblioteki innej firmy, nie można serializować (tak, aby Spark mógł przesyłać ją przez klaster do węzły robocze), użyj mapPartitions()zamiast map(). mapPartitions()przewiduje, że inicjalizacja jest wykonywana raz na zadanie robocze / wątek / partycję zamiast RDDna przykład raz dla każdego elementu danych : patrz poniżej.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. nie flatMapzachowują się jak mapy czy jak mapPartitions?

Tak. proszę zobaczyć przykład 2 z flatmap... nie wymaga wyjaśnień.

Q1. Jaka jest różnica między RDD mapimapPartitions

mapdziała z funkcją używaną na poziomie elementu, podczas gdy mapPartitionswykonuje funkcję na poziomie podziału.

Przykładowy scenariusz : jeśli mamy 100K elementów w określonejRDDpartycji, to uruchomimy funkcję używaną przez transformację odwzorowania 100K razy, gdy używamymap.

I odwrotnie, jeśli użyjemy, mapPartitionswtedy wywołamy określoną funkcję tylko raz, ale przekażemy wszystkie 100K rekordów i odzyskamy wszystkie odpowiedzi w jednym wywołaniu funkcji.

Nastąpi wzrost wydajności, ponieważ mapwiele razy działa na określonej funkcji, zwłaszcza jeśli za każdym razem funkcja robi coś kosztownego, czego nie musiałaby robić, gdybyśmy przekazali wszystkie elementy naraz (w przypadku mappartitions).

mapa

Stosuje funkcję transformacji do każdego elementu RDD i zwraca wynik jako nowy RDD.

Warianty aukcji

def map [U: ClassTag] (f: T => U): RDD [U]

Przykład:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Jest to wyspecjalizowana mapa wywoływana tylko raz dla każdej partycji. Cała zawartość odpowiednich partycji jest dostępna jako sekwencyjny strumień wartości za pośrednictwem argumentu wejściowego (Iterarator [T]). Funkcja niestandardowa musi zwracać kolejny Iterator [U]. Połączone iteratory wyników są automatycznie konwertowane na nowy RDD. Należy zauważyć, że w następującym wyniku brakuje krotek (3,4) i (6,7) z powodu wybranego przez nas podziału.

preservesPartitioningwskazuje, czy funkcja wejściowa zachowuje partycjoner, co powinno być, falsechyba że jest to para RDD, a funkcja wejściowa nie modyfikuje klawiszy.

Warianty aukcji

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], zachowujePartycjonowanie: Boolean = false): RDD [U]

Przykład 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Przykład 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Powyższy program można również napisać przy użyciu flatMap w następujący sposób.

Przykład 2 z użyciem flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Wniosek:

mapPartitionstransformacja jest szybsza niż mapponieważ wywołuje twoją funkcję raz / partycję, a nie raz / element.

Dalsza lektura: foreach Vs foreachPartments Kiedy używać Co?

Ram Ghadiyaram
źródło
4
Wiem, że możesz użyć maplub mapPartitionsosiągnąć ten sam wynik (zobacz dwa przykłady w pytaniu); to pytanie dotyczy tego, dlaczego wybrałbyś jedną z opcji. Komentarze w drugiej odpowiedzi są naprawdę przydatne! Ponadto, nie wspominając, że mapi flatMapprzejść falsedo preservesPartitioning, i jakie są konsekwencje, które są.
Nicholas White
2
funkcja wykonywana za każdym razem w porównaniu z funkcją wykonywaną raz dla partycji była połączeniem, którego brakowało. Dostęp do więcej niż jednego rekordu danych na raz za pomocą mapPartition jest nieocenioną rzeczą. doceniam odpowiedź
średniki i taśma klejąca
1
Czy istnieje scenariusz, w którym mapjest lepszy niż mapPartitions? Jeśli mapPartitionsjest tak dobra, dlaczego nie jest to domyślna implementacja mapy?
ruhong
1
@oneleggedmule: oba są przeznaczone dla różnych wymagań, których musimy mądrze używać, jeśli tworzysz instancje zasobów, takich jak połączenia db (jak pokazano w powyższym przykładzie), które są kosztowne, wtedy mappartitions jest właściwym podejściem, ponieważ jedno połączenie na partycję. również saveAsTextFile wewnętrznie używane partycje map patrz
Ram Ghadiyaram
@oneleggedmule Z mojego punktu widzenia map () jest łatwiejsza do zrozumienia i nauczenia, a także jest powszechną metodą w wielu różnych językach. Może być łatwiejszy w użyciu niż mapPartitions (), jeśli ktoś nie jest zaznajomiony z tą specyficzną metodą Spark na początku. Jeśli nie ma różnicy w wydajności, wolę używać map ().
Raymond Chen
15

Mapa :

  1. Przetwarza jeden wiersz na raz, bardzo podobnie do metody map () MapReduce.
  2. Wracasz z transformacji po każdym wierszu.

MapPartitions

  1. Za jednym razem przetwarza całą partycję.
  2. Z funkcji można powrócić tylko raz po przetworzeniu całej partycji.
  3. Wszystkie wyniki pośrednie muszą być przechowywane w pamięci do czasu przetworzenia całej partycji.
  4. Udostępnia funkcje setup () map () i cleanup () w MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
źródło
odnośnie 2 - jeśli wykonujesz transformacje iterator-do-iterator i nie materializujesz iteratora do jakiejś kolekcji, nie będziesz musiał przechowywać całej partycji w pamięci, w rzeczywistości w ten sposób Spark będzie mógł rozlać części partycji na dysk.
ilcord
4
Nie musisz przechowywać całej partycji w pamięci, ale wynik. Nie możesz zwrócić wyniku, dopóki nie przetworzysz całej partycji
KrazyGautam