Jak etapy są dzielone na zadania w Spark?

143

Załóżmy, że w każdym momencie działa tylko jedno zadanie Spark.

Co mam do tej pory

Oto, co rozumiem, co dzieje się w Spark:

  1. Po SparkContextutworzeniu węzła roboczego każdy węzeł roboczy uruchamia moduł wykonawczy. Executory to oddzielne procesy (JVM), które łączą się z powrotem z programem sterownika. Każdy executor ma słoik z programem sterownika. Zamknięcie sterownika powoduje wyłączenie wykonawców. Każdy executor może posiadać kilka partycji.
  2. Po wykonaniu zadania tworzony jest plan wykonania zgodnie z wykresem rodowodowym.
  3. Zadanie wykonania jest podzielone na etapy, w których etapy zawierają tyle samo sąsiednich (na wykresie rodowodowym) transformacji i akcji, ale bez tasowań. Tak więc etapy są oddzielone tasowaniem.

zdjęcie 1

Rozumiem, że

  • Zadanie to polecenie wysyłane ze sterownika do modułu wykonawczego przez serializację obiektu Function.
  • Wykonawca deserializuje (za pomocą pliku jar sterownika) polecenie (zadanie) i wykonuje je na partycji.

ale

Pytania)

Jak podzielić etap na te zadania?

Konkretnie:

  1. Czy zadania są zdeterminowane przez transformacje i działania, czy też może to być wiele przekształceń / działań w zadaniu?
  2. Czy zadania są określane przez partycję (np. Jedno zadanie na etap na partycję).
  3. Czy zadania są określane przez węzły (np. Jedno zadanie na etap na węzeł)?

Co myślę (tylko częściowa odpowiedź, nawet jeśli słuszna)

W https://0x0fff.com/spark-architecture-shuffle tasowanie jest wyjaśnione za pomocą obrazu

wprowadź opis obrazu tutaj

i mam wrażenie, że taka jest zasada

każdy etap jest podzielony na # zadania związane z liczbą partycji, bez względu na liczbę węzłów

W przypadku mojego pierwszego obrazu powiedziałbym, że mam 3 zadania mapy i 3 zadania redukcji.

W przypadku obrazu z 0x0fff powiedziałbym, że jest 8 zadań mapy i 3 zadania redukcji (zakładając, że są tylko trzy pomarańczowe i trzy ciemnozielone pliki).

W każdym razie pytania otwarte

Czy to jest poprawne? Ale nawet jeśli to prawda, nie na wszystkie moje pytania powyżej udzielono odpowiedzi, ponieważ nadal pozostaje otwarte, czy wiele operacji (np. Wiele map) znajduje się w ramach jednego zadania, czy też są one podzielone na jedno zadanie na operację.

Co mówią inni

Co to jest zadanie w Spark? W jaki sposób pracownik Spark wykonuje plik jar? i jak program planujący Apache Spark dzieli pliki na zadania? są podobne, ale nie czułem, że na moje pytanie udzielono jasnej odpowiedzi.

Make42
źródło

Odpowiedzi:

52

Masz tutaj całkiem niezły zarys. Aby odpowiedzieć na Twoje pytania

  • Odrębną task nie musi być uruchomiona dla każdej partycji danych dla każdego stage. Należy wziąć pod uwagę, że każda partycja prawdopodobnie będzie znajdować się w różnych lokalizacjach fizycznych - np. W blokach HDFS lub katalogach / woluminach lokalnego systemu plików.

Należy zauważyć, że przesłanie Stages jest sterowane przez DAG Scheduler. Oznacza to, że etapy, które nie są współzależne, można przesłać do klastra w celu wykonania równolegle: maksymalizuje to możliwości zrównoleglania w klastrze. Jeśli więc operacje w naszym przepływie danych mogą odbywać się jednocześnie, spodziewamy się uruchomienia wielu etapów.

Widzimy to w akcji na poniższym przykładzie zabawki, w którym wykonujemy następujące typy operacji:

  • załaduj dwa źródła danych
  • wykonać pewne operacje na mapach na obu źródłach danych oddzielnie
  • Dołącz do nich
  • wykonać pewne operacje na mapie i filtrowaniu wyniku
  • zapisz wynik

Więc z iloma etapami skończymy?

  • Po 1 etapie do równoległego ładowania dwóch źródeł danych = 2 etapy
  • Trzeci etap reprezentujący jointo jest zależny od pozostałych dwóch etapów
  • Uwaga: wszystkie kolejne operacje działające na połączonych danych mogą być wykonywane na tym samym etapie, ponieważ muszą być wykonywane sekwencyjnie. Nie ma korzyści z uruchomienia dodatkowych etapów, ponieważ nie mogą one rozpocząć pracy przed zakończeniem poprzedniej operacji.

Oto ten program zabawek

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

A oto DAG wyniku

wprowadź opis obrazu tutaj

Teraz: ile zadań ? Liczba zadań powinna być równa

Suma ( Stage* #Partitions in the stage)

javadba
źródło
2
Dzięki! Proszę o rozwinięcie odpowiedzi w odniesieniu do mojego tekstu: 1) Czy moja definicja etapów nie jest wyczerpująca? Wygląda na to, że przegapiłem wymóg, zgodnie z którym etap nie może zawierać operacji, które mogłyby być równoległe. A może mój opis już to ściśle sugeruje? 2) O liczbie zadań do wykonania dla zadania decyduje liczba partycji, ale nie liczba procesorów czy węzłów, natomiast liczba zadań, które można wykonać w tym samym czasie, zależy od liczby procesory, prawda? 3) Zadanie może zawierać wiele operacji?
Make42
1
4) Co miałeś na myśli mówiąc o swoim ostatnim zdaniu? W końcu partycje liczbowe mogą się różnić w zależności od etapu. Czy chodziło Ci o to, że tak skonfigurowałeś swoją pracę na wszystkich etapach?
Make42
@ Make42 Oczywiście liczba partycji może się różnić w zależności od etapu - masz rację. Chciałem powiedzieć, żebym sum(..)wziął pod uwagę tę zmienność.
javadba
wow, twoja odpowiedź była całkowicie w porządku, ale niestety ostatnie zdanie jest zdecydowanie błędną koncepcją. Nie oznacza to, że liczba partycji na etapie jest równa liczbie procesorów, jednak można ustawić liczbę partycji dla RDD zgodnie z liczbą rdzeni wyświetlanych na komputerze.
epcpu,
@epcpu To był szczególny przypadek - ale zgadzam się, że byłoby to mylące, więc go usuwam.
javadba
26

To może pomóc ci lepiej zrozumieć różne elementy:

  • Etap: to zbiór zadań. Ten sam proces działa na różnych podzbiorach danych (partycjach).
  • Zadanie: reprezentuje jednostkę pracy na partycji rozproszonego zestawu danych. Zatem na każdym etapie liczba zadań = liczba partycji, czyli jak powiedziałeś „jedno zadanie na etap na partycję”.
  • Każdy plik wykonawczy działa na jednym kontenerze przędzy, a każdy kontener znajduje się w jednym węźle.
  • Każdy etap wykorzystuje wiele modułów wykonawczych, każdemu z nich jest przydzielonych wiele wirtualnych rdzeni.
  • Każdy plik vcore może wykonywać jednocześnie dokładnie jedno zadanie
  • Tak więc na dowolnym etapie można było wykonywać wiele zadań równolegle. liczba uruchomionych zadań = liczba używanych rdzeni wirtualnych.
pedram bashiri
źródło
2
To jest naprawdę przydatna lektura o architekturze iskry: 0x0fff.com/spark-architecture
pedram bashiri.
Nie dostałem twojego punktu numer 3. O ile wiem, każdy węzeł może mieć wiele wykonawców, więc zgodnie z punktem 3: W każdym węźle powinien być tylko jeden moduł wykonawczy. Czy możesz wyjaśnić tę kwestię?
Rituparno Behera
@RituparnoBehera każdy węzeł może mieć wiele kontenerów, a tym samym wiele modułów wykonawczych Spark. Sprawdź ten link. docs.cloudera.com/runtime/7.0.2/running-spark-applications/ ...
pedram bashiri
15

Jeśli dobrze rozumiem, są 2 (powiązane) rzeczy, które Cię mylą:

1) Od czego zależy treść zadania?

2) Od czego zależy liczba zadań do wykonania?

Silnik Sparka „skleja” proste operacje na kolejnych rdds, na przykład:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

więc kiedy rdd3 jest (leniwie) obliczane, Spark wygeneruje zadanie na partycję rdd1 i każde zadanie wykona zarówno filtr, jak i mapę na wiersz, aby uzyskać rdd3.

Liczba zadań zależy od liczby partycji. Każdy RDD ma określoną liczbę partycji. W przypadku źródłowego RDD odczytywanego z HDFS (na przykład za pomocą sc.textFile (...)) liczba partycji to liczba podziałów wygenerowanych przez format wejściowy. Niektóre operacje na RDD mogą skutkować RDD z inną liczbą partycji:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Innym przykładem są złączenia:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(Większość) operacji zmieniających liczbę partycji wiąże się z tasowaniem, kiedy robimy na przykład:

rdd2 = rdd1.repartition( 1000 ) 

co właściwie się dzieje, to zadanie na każdej partycji rdd1 musi wygenerować końcowe wyjście, które może zostać odczytane przez następny etap, tak aby rdd2 miał dokładnie 1000 partycji (jak oni to robią? Hash lub Sort ). Zadania po tej stronie są czasami nazywane „Zadaniami mapy (pobocznymi)”. Zadanie, które zostanie później uruchomione na rdd2, będzie działało na jednej partycji (rdd2!) I będzie musiało wymyślić, jak odczytać / połączyć wyjścia po stronie mapy dotyczące tej partycji. Zadania po tej stronie są czasami nazywane „Zmniejsz liczbę zadań (pobocznych)”.

Dwa pytania są ze sobą powiązane: liczba zadań na etapie to liczba przegród (wspólna dla kolejnych „sklejonych” ze sobą), a liczba przegród w drugim etapie może zmieniać się między etapami (przez określenie liczby przegród na kilka na przykład tasowanie powodujące działanie).

Po rozpoczęciu wykonywania etapu jego zadania mogą zajmować miejsca na zadania. Liczba równoczesnych przedziałów zadań wynosi numExecutors * ExecutorCores. Na ogół mogą to być zadania z różnych, niezależnych od siebie etapów.

Harel Gliksman
źródło