Wydajność Spark dla Scala vs Python

178

Wolę Pythona od Scali. Ale ponieważ Spark jest natywnie napisany w Scali, spodziewałem się, że mój kod będzie działał szybciej w Scali niż wersja Python z oczywistych powodów.

Mając to założenie, pomyślałem, że powinienem nauczyć się i napisać wersję Scala jakiegoś bardzo powszechnego kodu do wstępnego przetwarzania dla około 1 GB danych. Dane są pobierane z zawodów SpringLeaf na Kaggle . Żeby dać przegląd danych (zawiera 1936 wymiarów i 145232 wiersze). Dane składają się z różnych typów, np. Int, float, string, boolean. Używam 6 rdzeni z 8 do przetwarzania Spark; dlatego użyłem minPartitions=6tak, aby każdy rdzeń miał coś do przetworzenia.

Kod Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Kod w Pythonie

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Etap 0 (38 minut), Etap 1 (18 sekund) wprowadź opis obrazu tutaj

Python Performance Stage 0 (11 min), Stage 1 (7 s) wprowadź opis obrazu tutaj

Oba generują różne wykresy wizualizacji DAG (dzięki czemu oba obrazy pokazują różne funkcje etapu 0 dla Scala ( map) i Python ( reduceByKey))

Ale zasadniczo oba kody próbują przekształcić dane do (id_wymiaru, ciąg listy wartości) RDD i zapisać na dysku. Dane wyjściowe zostaną wykorzystane do obliczenia różnych statystyk dla każdego wymiaru.

Jeśli chodzi o wydajność, kod Scala dla takich rzeczywistych danych wydaje się działać 4 razy wolniej niż wersja Pythona. Dobra wiadomość jest dla mnie taka, że ​​dało mi to dobrą motywację do pozostania przy Pythonie. Zła wiadomość jest taka, że ​​nie całkiem rozumiem, dlaczego?

Mrityunjay
źródło
8
Być może jest to zależne od kodu i aplikacji, ponieważ otrzymuję inny wynik, że python iskra apache jest wolniejszy niż scala, gdy sumuje się miliard wyrazów wzoru Leibniza dla π
Paul
3
Interesujące pytanie! Przy okazji zajrzyj również tutaj: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Im więcej masz rdzeni, tym mniej widać różnice między językami.
Markon,
Czy rozważałeś zaakceptowanie istniejącej odpowiedzi?
10465355 mówi Przywróć Monikę

Odpowiedzi:

358

Oryginalną odpowiedź dotyczącą kodu można znaleźć poniżej.


Przede wszystkim musisz rozróżnić różne typy API, z których każdy ma własne względy wydajności.

RDD API

(czyste struktury Pythona z orkiestracją opartą na JVM)

Jest to komponent, na który najbardziej wpłynie wydajność kodu Pythona i szczegóły implementacji PySpark. Chociaż wydajność Pythona raczej nie będzie stanowić problemu, jest przynajmniej kilka czynników, które należy wziąć pod uwagę:

  • Narzut komunikacji JVM. Praktycznie wszystkie dane przychodzące i wychodzące z modułu wykonawczego Pythona muszą być przekazywane przez gniazdo i proces roboczy JVM. Chociaż jest to stosunkowo wydajna komunikacja lokalna, nadal nie jest bezpłatna.
  • Wykonywacze oparte na procesach (Python) a programy wykonawcze oparte na wątkach (pojedyncza maszyna JVM, wiele wątków) (Scala). Każdy moduł wykonawczy Pythona działa we własnym procesie. Jako efekt uboczny zapewnia silniejszą izolację niż jej odpowiednik JVM i pewną kontrolę nad cyklem życia modułu wykonawczego, ale potencjalnie znacznie większe zużycie pamięci:

    • ślad pamięci interpretera
    • ślad załadowanych bibliotek
    • mniej wydajne nadawanie (każdy proces wymaga własnej kopii transmisji)
  • Wydajność samego kodu Pythona. Ogólnie rzecz biorąc, Scala jest szybsza niż Python, ale będzie się różnić w zależności od zadania. Ponadto masz wiele opcji, w tym JIT, takie jak Numba , rozszerzenia C ( Cython ) lub specjalistyczne biblioteki, takie jak Theano . Wreszcie, jeśli nie używasz ML / MLlib (lub po prostu stosu NumPy) , rozważ użycie PyPy jako alternatywnego interpretera. Zobacz SPARK-3094 .

  • Konfiguracja PySpark zapewnia spark.python.worker.reuseopcję, której można użyć do wyboru między rozwidlaniem procesu Pythona dla każdego zadania a ponownym wykorzystaniem istniejącego procesu. Ta ostatnia opcja wydaje się być przydatna do uniknięcia kosztownego zbierania śmieci (jest to bardziej wrażenie niż wynik systematycznych testów), podczas gdy pierwsza (domyślna) jest optymalna w przypadku drogich transmisji i importu.
  • Zliczanie odwołań, używane jako pierwsza metoda wyrzucania elementów bezużytecznych w CPythonie, działa całkiem dobrze z typowymi obciążeniami Spark (przetwarzanie strumieniowe, brak cykli referencyjnych) i zmniejsza ryzyko długich przerw w GC.

MLlib

(mieszane wykonanie Pythona i JVM)

Podstawowe zagadnienia są prawie takie same jak wcześniej, z kilkoma dodatkowymi kwestiami. Podczas gdy podstawowe struktury używane w MLlib są zwykłymi obiektami RDD Pythona, wszystkie algorytmy są wykonywane bezpośrednio przy użyciu Scali.

Oznacza to dodatkowy koszt konwersji obiektów Pythona na obiekty Scala i odwrotnie, zwiększone zużycie pamięci i kilka dodatkowych ograniczeń, które omówimy później.

W chwili obecnej (Spark 2.x) interfejs API oparty na RDD jest w trybie konserwacji i ma zostać usunięty w Spark 3.0 .

DataFrame API i Spark ML

(Wykonanie JVM z kodem Python ograniczone do sterownika)

To prawdopodobnie najlepszy wybór do standardowych zadań przetwarzania danych. Ponieważ kod Pythona jest głównie ograniczony do wysokopoziomowych operacji logicznych na sterowniku, nie powinno być różnicy w wydajności między Pythonem i Scalą.

Jedynym wyjątkiem jest użycie wierszy UDF Python, które są znacznie mniej wydajne niż ich odpowiedniki w Scali. Chociaż istnieje pewna szansa na ulepszenia (nastąpił znaczny rozwój w Spark 2.0.0), największym ograniczeniem jest pełne przejście w obie strony między wewnętrzną reprezentacją (JVM) a interpreterem Pythona. Jeśli to możliwe, powinieneś preferować kompozycję wbudowanych wyrażeń ( przykład . Zachowanie UDF w Pythonie zostało ulepszone w Spark 2.0.0, ale nadal jest nieoptymalne w porównaniu z wykonaniem natywnym.

Może to ulec poprawie w przyszłości i znacznie się poprawiło wraz z wprowadzeniem wektoryzowanych UDF (SPARK-21190 i dalsze rozszerzenia) , które wykorzystują Arrow Streaming do wydajnej wymiany danych z zerową deserializacją. W przypadku większości aplikacji można po prostu zignorować dodatkowe koszty.

Upewnij się również, że unikasz niepotrzebnego przesyłania danych między DataFramesa RDDs. Wymaga to kosztownej serializacji i deserializacji, nie wspominając o transferze danych do iz interpretera Pythona.

Warto zauważyć, że wywołania Py4J mają dość duże opóźnienia. Obejmuje to proste połączenia, takie jak:

from pyspark.sql.functions import col

col("foo")

Zwykle nie powinno to mieć znaczenia (narzut jest stały i nie zależy od ilości danych), ale w przypadku miękkich aplikacji czasu rzeczywistego można rozważyć buforowanie / ponowne użycie opakowań Java.

GraphX ​​i Spark DataSets

Na razie (Spark 1.6 2.1) żaden z nich nie zapewnia PySpark API, więc można powiedzieć, że PySpark jest nieskończenie gorszy niż Scala.

GraphX

W praktyce rozwój GraphX ​​zatrzymał się prawie całkowicie, a projekt jest obecnie w trybie konserwacji, a powiązane zgłoszenia JIRA są zamknięte, ponieważ nie zostaną naprawione . Biblioteka GraphFrames udostępnia alternatywną bibliotekę przetwarzania wykresów z powiązaniami języka Python.

Zestaw danych

Mówiąc subiektywnie, Datasetsw Pythonie nie ma zbyt wiele miejsca na statyczne wpisywanie danych, a nawet jeśli obecna implementacja Scali jest zbyt uproszczona i nie zapewnia takich samych korzyści wydajnościowych jak DataFrame.

Streaming

Z tego, co do tej pory widziałem, zdecydowanie polecam używanie Scali zamiast Pythona. Może się to zmienić w przyszłości, jeśli PySpark otrzyma wsparcie dla strumieni ustrukturyzowanych, ale obecnie API Scala wydaje się być znacznie bardziej niezawodne, wszechstronne i wydajne. Moje doświadczenie jest dość ograniczone.

Strukturalne przesyłanie strumieniowe w Spark 2.x wydaje się zmniejszać przepaść między językami, ale na razie jest jeszcze w początkowej fazie. Niemniej jednak API oparte na RDD jest już określane jako „starsze przesyłanie strumieniowe” w dokumentacji Databricks (data dostępu 2017-03-03), więc rozsądne jest oczekiwanie dalszych wysiłków na rzecz unifikacji.

Zagadnienia dotyczące braku wydajności

Parzystość funkcji

Nie wszystkie funkcje platformy Spark są udostępniane za pośrednictwem interfejsu PySpark API. Sprawdź, czy potrzebne części są już zaimplementowane i spróbuj zrozumieć możliwe ograniczenia.

Jest to szczególnie ważne, gdy używasz MLlib i podobnych kontekstów mieszanych (zobacz Wywoływanie funkcji Java / Scala z zadania ). Aby być uczciwym, niektóre części API PySpark, na przykład mllib.linalg, zapewniają bardziej wszechstronny zestaw metod niż Scala.

Projekt API

PySpark API ściśle odzwierciedla jego odpowiednik w Scali i jako taki nie jest dokładnie Pythonic. Oznacza to, że mapowanie między językami jest dość łatwe, ale jednocześnie kod Pythona może być znacznie trudniejszy do zrozumienia.

Złożona architektura

Przepływ danych w PySpark jest stosunkowo złożony w porównaniu z czystym wykonaniem JVM. O wiele trudniej jest uzasadnić programy PySpark lub debugować. Co więcej, przynajmniej podstawowa znajomość Scali i JVM w ogóle jest koniecznością.

Spark 2.x i nowszy

Ciągłe przejście w kierunku DatasetAPI, z zamrożonym API RDD, przynosi zarówno możliwości, jak i wyzwania dla użytkowników Pythona. Podczas gdy wysokopoziomowe części API są znacznie łatwiejsze do ujawnienia w Pythonie, bardziej zaawansowane funkcje są prawie niemożliwe do bezpośredniego użycia .

Ponadto natywne funkcje Pythona nadal są obywatelami drugiej kategorii w świecie SQL. Miejmy nadzieję, że poprawi się to w przyszłości dzięki serializacji Apache Arrow ( obecne wysiłki dotyczą danych,collection ale serde UDF jest celem długoterminowym ).

W przypadku projektów silnie zależnych od bazy kodu Pythona, alternatywne alternatywy w czystym Pythonie (takie jak Dask lub Ray ) mogą być interesującą alternatywą.

Nie musi to być jedno kontra drugie

Interfejs API Spark DataFrame (SQL, Dataset) zapewnia elegancki sposób integracji kodu Scala / Java w aplikacji PySpark. Można użyć DataFramesdo udostępnienia danych natywnemu kodowi maszyny JVM i odczytania wyników. Wyjaśniłem niektóre opcje gdzie indziej , a działający przykład pętli Python-Scala znajdziesz w artykule Jak używać klasy Scala w Pyspark .

Można go dodatkowo rozszerzyć, wprowadzając typy zdefiniowane przez użytkownika (zobacz Jak zdefiniować schemat dla typu niestandardowego w Spark SQL? ).


Co jest nie tak z kodem podanym w pytaniu

(Zastrzeżenie: punkt widzenia Pythonisty. Najprawdopodobniej przegapiłem kilka sztuczek Scala)

Po pierwsze, w Twoim kodzie jest jedna część, która w ogóle nie ma sensu. Jeśli masz już (key, value)pary utworzone za pomocą zipWithIndexlub enumeratejaki jest sens w tworzeniu łańcucha tylko po to, aby go potem podzielić? flatMapnie działa rekurencyjnie, więc możesz po prostu utworzyć krotki i pominąć mapjakiekolwiek śledzenie .

Inną częścią, która wydaje mi się problematyczna, jest reduceByKey. Ogólnie rzecz biorąc, reduceByKeyjest przydatne, jeśli zastosowanie funkcji agregującej może zmniejszyć ilość danych, które muszą być przetasowane. Ponieważ po prostu łączysz łańcuchy, nie ma tu nic do zyskania. Ignorując rzeczy niskiego poziomu, takie jak liczba odniesień, ilość danych, które musisz przesłać, jest dokładnie taka sama jak w przypadku groupByKey.

Normalnie nie rozwodziłbym się nad tym, ale o ile wiem, jest to wąskie gardło w twoim kodzie Scala. Łączenie ciągów znaków w JVM jest dość kosztowną operacją (zobacz na przykład: Czy konkatenacja ciągów w scali jest tak kosztowna, jak w Javie? ). Oznacza to, że coś takiego, _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) co jest odpowiednikiem tego input4.reduceByKey(valsConcat)w kodzie, nie jest dobrym pomysłem.

Jeśli chcesz tego uniknąć groupByKey, możesz spróbować użyć aggregateByKeyz StringBuilder. Coś podobnego do tego powinno załatwić sprawę:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

ale wątpię, czy jest to warte całego zamieszania.

Mając na uwadze powyższe, przepisałem Twój kod w następujący sposób:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Wyniki

W local[6]trybie (procesor Intel (R) Xeon (R) E3-1245 V2 @ 3,40 GHz) z 4 GB pamięci na moduł wykonawczy (n = 3):

  • Scala - średnia: 250,00s, odch.stod .: 12.49
  • Python - średnia: 246,66s, stdev: 1,15

Jestem prawie pewien, że większość tego czasu spędza na tasowaniu, serializowaniu, deserializacji i innych zadaniach drugorzędnych. Dla zabawy, oto naiwny jednowątkowy kod w Pythonie, który wykonuje to samo zadanie na tej maszynie w mniej niż minutę:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
zero323
źródło
23
Jedna z najbardziej jasnych, wyczerpujących i przydatnych odpowiedzi, z jakimi od jakiegoś czasu się spotkałem. Dzięki!
etov
Jaki z ciebie wspaniały facet!
DennisLi
-4

Rozszerzenie do powyższych odpowiedzi -

Scala okazuje się szybsza na wiele sposobów w porównaniu do Pythona, ale jest kilka ważnych powodów, dla których Python staje się bardziej popularny niż scala, zobaczmy kilka z nich -

Python dla Apache Spark jest dość łatwy do nauczenia i używania. Jednak to nie jedyny powód, dla którego Pyspark jest lepszym wyborem niż Scala. Jest więcej.

Python API for Spark może działać wolniej w klastrze, ale ostatecznie naukowcy zajmujący się danymi mogą z nim zrobić znacznie więcej niż Scala. Brakuje złożoności Scali. Interfejs jest prosty i wszechstronny.

Mówienie o czytelności kodu, konserwacji i znajomości interfejsu API Pythona dla Apache Spark jest znacznie lepsze niż Scala.

Python zawiera kilka bibliotek związanych z uczeniem maszynowym i przetwarzaniem języka naturalnego. Pomaga to w analizie danych, a także zawiera statystyki, które są bardzo dojrzałe i sprawdzone w czasie. Na przykład numpy, pandas, scikit-learn, seaborn i matplotlib.

Uwaga: większość analityków danych stosuje podejście hybrydowe, w którym wykorzystuje to, co najlepsze z obu interfejsów API.

Wreszcie, społeczność Scala często okazuje się znacznie mniej pomocna dla programistów. To sprawia, że ​​Python jest bardzo cenną nauką. Jeśli masz wystarczające doświadczenie z jakimkolwiek językiem programowania statycznie typowanym, takim jak Java, możesz przestać martwić się, że nie będziesz w ogóle używać Scali.


źródło