Spark: Dlaczego Python znacznie przewyższa Scalę w moim przypadku użycia?

16

Aby porównać wydajność Spark przy użyciu Pythona i Scali, stworzyłem to samo zadanie w obu językach i porównałem środowisko wykonawcze. Spodziewałem się, że obie prace zajmą mniej więcej tyle samo czasu, ale praca w Pythonie trwała tylko 27min, podczas gdy praca w Scali trwała 37min(prawie 40% dłużej!). Zaimplementowałem również tę samą pracę w Javie i to 37minutesteż zajęło . Jak to możliwe, że Python jest o wiele szybszy?

Minimalny możliwy do zweryfikowania przykład:

Zadanie Python:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Praca Scala:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Po prostu patrząc na kod wydają się identyczne. Spojrzałem na DAG i nie dostarczyli oni żadnych spostrzeżeń (a przynajmniej brakuje mi know-how, aby znaleźć na ich podstawie wyjaśnienie).

Byłbym naprawdę wdzięczny za wszelkie wskazówki.

maestromusica
źródło
Komentarze nie są przeznaczone do rozszerzonej dyskusji; ta rozmowa została przeniesiona do czatu .
Samuel Liew
1
Rozpocząłbym analizę, zanim o cokolwiek zapytałbym, odmierzając czas w odpowiednich blokach i instrukcjach, aby sprawdzić, czy istnieje określone miejsce, w którym wersja Pythona jest szybsza. Wtedy być może uda Ci się wyostrzyć pytanie „dlaczego ta instrukcja python jest szybsza”.
Terry Jan Reedy

Odpowiedzi:

11

Twoje podstawowe założenie, że Scala lub Java powinny być szybsze do tego konkretnego zadania, jest po prostu błędne. Możesz to łatwo zweryfikować za pomocą minimalnych aplikacji lokalnych. Scala One:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python pierwszy

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Wyniki (300 powtórzeń każda, Python 3.7.6, Scala 02.11.12) w sprawie Posts.xmlz hermeneutics.stackexchange.com dane zrzucić z mieszanką i dopasowywania wzorców non pasujące:

wykresy czasowe w milisekundach dla powyższych programów

  • Python 273,50 (258,84, 288,16)
  • Scala 634,13 (533,81, 734,45)

Jak widzisz, Python jest nie tylko systematycznie szybszy, ale także bardziej spójny (mniejszy spread).

Komunikat „zabierz” to - nie wierz w bezpodstawny FUD - języki mogą być szybsze lub wolniejsze w określonych zadaniach lub w określonych środowiskach (na przykład tutaj Scala może zostać uderzona przez uruchomienie JVM i / lub GC i / lub JIT), ale jeśli twierdzisz, że jak „XYZ jest X4 szybszy” lub „XYZ jest wolny w porównaniu do ZYX (..) Około, 10 razy wolniejszy” zwykle oznacza to, że ktoś napisał naprawdę zły kod do testowania.

Edytuj :

Aby zaradzić niektórym obawom wyrażonym w komentarzach:

  • W kodzie OP dane są przekazywane głównie w jednym kierunku (JVM -> Python) i nie jest wymagana żadna prawdziwa serializacja (ta konkretna ścieżka po prostu przechodzi przez testowanie bez zmian i dekodowanie na UTF-8 po drugiej stronie). Jest to tak tanie, jak to tylko możliwe, jeśli chodzi o „serializację”.
  • To, co jest przekazywane, to tylko jedna liczba całkowita według partycji, więc wpływ w tym kierunku jest znikomy.
  • Komunikacja odbywa się za pośrednictwem lokalnych gniazd (cała komunikacja w ramach procesu roboczego poza początkowym połączeniem i uwierzytelnianiem odbywa się za pomocą zwróconego deskryptora plikulocal_connect_and_auth i jest niczym innym jak plikiem powiązanym z gniazdem ). Ponownie, tak tanie, jak to możliwe, jeśli chodzi o komunikację między procesami.
  • Biorąc pod uwagę różnicę w wydajności pierwotnej pokazaną powyżej (znacznie wyższą niż to, co widzisz w swoim programie), istnieje duży margines dla kosztów ogólnych wymienionych powyżej.
  • Ten przypadek różni się całkowicie od przypadków, w których proste lub złożone obiekty muszą być przekazywane do iz interpretera Pythona w formie, która jest dostępna dla obu stron jako zrzuty kompatybilne z zalewami (najbardziej znaczącymi przykładami są UDF w starym stylu, niektóre części starego MLLib).

Edycja 2 :

Ponieważ jasper-m był zaniepokojony kosztem uruchomienia tutaj, łatwo można udowodnić, że Python nadal ma znaczącą przewagę nad Scalą, nawet jeśli rozmiar wejściowy zostanie znacznie zwiększony.

Oto wyniki dla 2003360 linii / 5.6G (to samo wejście, po prostu zduplikowane wiele razy, 30 powtórzeń), który to sposób przekracza wszystko, czego można oczekiwać w jednym zadaniu Spark.

wprowadź opis zdjęcia tutaj

  • Python 22809.57 (21466.26, 24152.87)
  • Scala 27315.28 (24367.24, 30263.31)

Uwaga: nie nakładające się przedziały ufności.

Edycja 3 :

Aby odpowiedzieć na inny komentarz Jasper-M:

Większość przetwarzania nadal odbywa się w JVM w przypadku Spark.

Jest to po prostu niepoprawne w tym konkretnym przypadku:

  • Zadanie, o którym mowa, to zadanie mapy z pojedynczą globalną redukcją przy użyciu RDD PySpark.
  • PySpark RDD (inaczej niż powiedzmy DataFrame) implementuje ogromną funkcjonalność natywnie w Pythonie, z wyjątkami wejścia, wyjścia i komunikacji między węzłami.
  • Ponieważ jest to jednoetapowe zadanie, a końcowy wynik jest wystarczająco mały, aby go zignorować, głównym obowiązkiem JVM (jeśli ktoś ma nitpick, jest to implementowane głównie w Javie, a nie Scali) jest wywoływanie formatu wejściowego Hadoop i przepychanie danych przez gniazdo plik do Pythona.
  • Część do odczytu jest identyczna dla JVM i API Pythona, więc można ją uznać za stały narzut. Nie kwalifikuje się również jako większość przetwarzania , nawet w przypadku tak prostej pracy jak ta.
użytkownik10938362
źródło
3
doskonałe podejście do problemu. Dziękujemy za udostępnienie tego
Alexandros Biratsis,
1
@egordoe Alexandros powiedział „nie ma tu wywołania UDF”, nie „Python nie jest wywoływany” - to robi różnicę. Narzut związany z serializacją jest ważny tam, gdzie dane są wymieniane między systemami (tj. Gdy chcesz przekazać dane do UDF iz powrotem).
user10938362
1
@egordoe Wyraźnie mylisz dwie rzeczy - narzut serializacji, który jest problemem, gdy nietrywialne obiekty są przekazywane tam iz powrotem. I koszty komunikacji. Narzuty serializacji są niewielkie lub nie ma ich wcale, ponieważ po prostu przechodzisz i dekodujesz bajtowanie, a dzieje się to głównie w kierunku, ponieważ z powrotem dostajesz jedną liczbę całkowitą na partycję. Komunikacja budzi pewne obawy, ale przekazywanie danych przez lokalne gniazda jest wydajne, ponieważ naprawdę się dzieje, jeśli chodzi o komunikację między procesami. Jeśli nie jest to jasne, polecam przeczytać źródło - nie jest trudne i będzie pouczające.
user10938362
1
Ponadto metody serializacji nie są po prostu równe. Ponieważ przypadek Spark pokazuje, że dobre metody serializacji mogą obniżyć koszty do poziomu, na którym nie ma już znaczenia (patrz Pandas UDF ze strzałką), a kiedy to nastąpi, inne czynniki mogą dominować (patrz na przykład porównania wydajności funkcji okna Scala i ich odpowiedników z Pandami UDF - Python wygrywa tam znacznie wyższą marżą niż w tym pytaniu).
user10938362
1
Masz na myśli @ Jasper-M? Poszczególne zadania Spark są zwykle wystarczająco małe, aby obciążyć je porównywalnym obciążeniem. Nie zrozum mnie źle, ale jeśli masz jakiś kontrprzykład, który unieważnia to lub całe pytanie, opublikuj je. Zauważyłem już, że działania wtórne w pewnym stopniu przyczyniają się do tej wartości, ale nie dominują kosztów. Wszyscy jesteśmy inżynierami (pewnego rodzaju) tutaj - porozmawiajmy o liczbach i kodzie, a nie o przekonaniach, prawda?
user10938362
4

Zadanie Scala trwa dłużej, ponieważ ma błędną konfigurację, dlatego zadania Python i Scala zostały wyposażone w nierówne zasoby.

W kodzie są dwa błędy:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LINIA 1. Po wykonaniu linii konfiguracja zasobów zadania Spark jest już ustalona i naprawiona. Od tego momentu nic nie można zmienić. Ani liczba wykonawców, ani liczba rdzeni na wykonawcę.
  2. LINIA 4-5. sc.hadoopConfigurationjest złym miejscem do ustawienia dowolnej konfiguracji Spark. Powinien być ustawiony w configinstancji, którą przekazujesz new SparkContext(config).

[DODANO] Mając powyższe na uwadze, proponuję zmienić kod zadania Scala na

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

i ponownie go przetestuj. Założę się, że wersja Scali będzie teraz X razy szybsza.

egordoe
źródło
Zweryfikowałem, że oba zadania wykonują równolegle 32 zadania, więc nie sądzę, żeby to był winowajca?
maestromusica
dzięki za edycję, postaram się go teraz
przetestować
Cześć @maestromusica, musi to być coś w konfiguracji zasobów, ponieważ z natury rzeczy Python może nie przewyższyć Scali w tym konkretnym przypadku użycia. Innym powodem mogą być pewne nieskorelowane czynniki losowe, tj. Obciążenie klastra w danym momencie i podobne. Przy okazji, jakiego trybu używasz? samodzielny, lokalny, przędza?
egordoe
Tak, zweryfikowałem, że ta odpowiedź jest niepoprawna. Środowisko wykonawcze jest takie samo. Wydrukowałem również konfigurację w obu przypadkach i jest identyczna.
maestromusica
1
Myślę, że masz rację. Zadałem to pytanie, aby zbadać wszystkie inne możliwości, takie jak błąd w kodzie lub może to, że coś źle zrozumiałem. Dzięki za wkład.
maestromusica