Apache Spark: liczba rdzeni w porównaniu z liczbą wykonawców

194

Próbuję zrozumieć związek między liczbą rdzeni i liczbą wykonawców podczas uruchamiania zadania Spark w YARN.

Środowisko testowe wygląda następująco:

  • Liczba węzłów danych: 3
  • Specyfikacja maszyny węzła danych:
    • Procesor: Core i7-4790 (liczba rdzeni: 4, liczba wątków: 8)
    • RAM: 32 GB (8 GB x 4)
    • HDD: 8 TB (2 TB x 4)
  • Sieć: 1 Gb

  • Wersja Spark: 1.0.0

  • Wersja Hadoop: 2.4.0 (Hortonworks HDP 2.1)

  • Przepływ pracy Spark: sc.textFile -> filtr -> mapa -> filtr -> mapToPair -> zmniejszByKey -> mapa -> saveAsTextFile

  • Dane wejściowe

    • Wpisz: pojedynczy plik tekstowy
    • Rozmiar: 165 GB
    • Liczba linii: 454,568,833
  • Wynik

    • Liczba linii po drugim filtrze: 310,640,717
    • Liczba linii pliku wynikowego: 99,848,268
    • Rozmiar pliku wynikowego: 41 GB

Zadanie zostało uruchomione z następującymi konfiguracjami:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (executory na węzeł danych, użyj tyle co rdzenie)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (liczba rdzeni zredukowana)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (mniej rdzenia, więcej executora)

Czasy, które upłynęły:

  1. 50 min 15 sek

  2. 55 min 48 sek

  3. 31 min 23 sek

Ku mojemu zaskoczeniu (3) było znacznie szybsze.
Myślałem, że (1) będzie szybszy, ponieważ podczas tasowania komunikacja między wykonawcami będzie mniejsza.
Chociaż liczba rdzeni z (1) jest mniejsza niż (3), liczba rdzeni nie jest kluczowym czynnikiem, ponieważ 2) działała dobrze.

(Dodano odpowiedzi po odpowiedzi pwilmot.)

Aby uzyskać informacje, zrzut ekranu monitora wydajności wygląda następująco:

  • Podsumowanie węzła danych Ganglia dla (1) - zadanie rozpoczęto o 04:37.

Podsumowanie węzła danych Ganglia dla (1)

  • Podsumowanie węzła danych Ganglia dla (3) - zadanie rozpoczęto o 19:47. Zignoruj ​​wykres przed tym czasem.

Podsumowanie węzła danych Ganglia dla (3)

Wykres z grubsza dzieli się na 2 sekcje:

  • Po pierwsze: od początku do zmniejszenia Klucz: intensywny procesor, brak aktywności sieciowej
  • Po drugie: po zmniejszeniu Klucz: obniżenie procesora, sieciowe operacje we / wy są gotowe.

Jak pokazuje wykres, (1) może zużywać tyle mocy procesora, ile zostało podane. Może to nie być problem z liczbą wątków.

Jak wytłumaczyć ten wynik?

zeodtr
źródło
2
Teraz podejrzewam, że GC ... W rzeczywistości w Spark UI całkowity czas spędzony na GC jest dłuższy na 1) niż 2).
zeodtr
Dlaczego nie próbowałeś 3) z 19G? Czy to możliwe, że ograniczenie robotników do 4G zmniejsza efekt NUMA, który zauważają niektórzy ppl? tzn. twoje 4G są umieszczone na jednym z 2 rdzeni przypisanych do twojego przepływu pracy, a zatem występuje mniejsze spowolnienie we / wy, co prowadzi do lepszej ogólnej wydajności. W przeciwnym razie myślę, że głównym pytaniem jest: ile rdzeni / wątków może używać jednego modułu wykonującego na pracowniku? (Można podać tylko całkowitą liczbę rdzeni dla pracownika, a nie szczegółowość modułu wykonującego)
Bacon
4
Btw właśnie sprawdziłem kod w core / src / main / scala / org / apache / spark / deploy / worker / ExecutorRunner.scala i wydaje się, że 1 executor = 1 wątek pracownika.
Bacon
trochę późno, ale tutaj jest post na cloudera na ten temat: blog.cloudera.com/blog/2015/03/…
Orelus,
1
Nawiasem mówiąc, znalazłem tę informację na slajdzie cloudera slideshare.net/cloudera/... , która wyjaśnia trochę o podejmowaniu decyzji w wykonawcach, rdzeniach i pamięci
Manish Sahni

Odpowiedzi:

58

Aby, mam nadzieję, uczynić to wszystko nieco bardziej konkretnym, oto sprawdzony przykład konfigurowania aplikacji Spark do korzystania z jak największej ilości klastra: Wyobraź sobie klaster z sześcioma węzłami z uruchomionymi NodeManagerami, każdy wyposażony w 16 rdzeni i 64 GB pamięci . Wydajności NodeManager, yarn.nodemanager.resource.memory-mb i yarn.nodemanager.resource.cpu-vcores, prawdopodobnie powinny być ustawione odpowiednio na 63 * 1024 = 64512 (megabajty) i 15. Unikamy przydzielania 100% zasobów do kontenerów YARN, ponieważ węzeł potrzebuje zasobów do uruchomienia systemu operacyjnego i demonów Hadoop. W takim przypadku pozostawiamy gigabajt i rdzeń dla tych procesów systemowych. Cloudera Manager pomaga, rozliczając je i automatycznie konfigurując te właściwości YARN.

Najprawdopodobniej pierwszym impulsem byłoby użycie --num-executors 6 --executor-core 15 --executor-memory 63G . Jest to jednak niewłaściwe podejście, ponieważ:

63 GB + obciążenie pamięci modułu wykonującego nie mieści się w pojemności 63 GB NodeManagers. Mistrz aplikacji zajmie rdzeń na jednym z węzłów, co oznacza, że ​​nie będzie miejsca dla 15-rdzeniowego modułu wykonującego w tym węźle. 15 rdzeni na moduł wykonujący może prowadzić do złej przepustowości we / wy HDFS.

Lepszą opcją byłoby użycie --num-executors 17 - core-executor 5 - executor-memory 19G . Czemu?

Ta konfiguracja skutkuje trzema programami wykonawczymi na wszystkich węzłach z wyjątkiem tego z AM, który będzie miał dwa programy wykonawcze. - executor-memory wyprowadzono jako (63/3 executorów na węzeł) = 21,21 * 0,07 = 1,47. 21 - 1,47 ~ 19.

Wyjaśnienie zostało podane w artykule na blogu Cloudera, How-to: Tune Your Apache Spark Jobs (Part 2) .

DzOrdre
źródło
1
„Ta konfiguracja skutkuje trzema programami wykonawczymi na wszystkich węzłach z wyjątkiem tego z AM, który będzie miał dwa programy wykonawcze.”. Co to oznacza w odniesieniu do „--executor-cores 5”?
derek
Oznacza to, że każdy moduł wykonujący używa 5 rdzeni. Każdy węzeł ma 3 moduły wykonawcze, dlatego używa 15 rdzeni, z wyjątkiem tego, że jeden z węzłów będzie również uruchamiał wzorzec aplikacji dla zadania, więc może obsługiwać tylko 2 moduły wykonawcze, tj. 10 rdzeni używanych jako moduły wykonawcze.
Davos
Dobrze wyjaśnione - należy pamiętać, że dotyczy to opcji yarn.scheduler.capacity.resource-calculatorwyłączonej, która jest domyślna. Dzieje się tak, ponieważ domyślnie programuje według pamięci, a nie procesora.
YoYo
1
Więcej programów wykonawczych może prowadzić do złej przepustowości we / wy HDFS. Więc jeśli w ogóle nie używam HDFS, czy w takim przypadku mogę użyć więcej niż 5 rdzeni na moduł wykonujący?
Darshan
Myślałem, że wzorzec aplikacji działa na każdym węźle. Powyżej, co oznacza, że ​​będzie tylko 1 wzorzec aplikacji do uruchomienia zadania. Czy to jest poprawne?
Roshan Fernando
15

Według Sandy Ryza , gdy uruchamiasz swoją aplikację Spark na HDFS

Zauważyłem, że klient HDFS ma problem z mnóstwem jednoczesnych wątków. Z grubsza przypuszcza się, że maksymalnie pięć zadań na jednego executora może osiągnąć pełną przepustowość zapisu, więc dobrze jest utrzymać liczbę rdzeni na executorze poniżej tej liczby.

Uważam więc, że twoja pierwsza konfiguracja jest wolniejsza niż trzecia z powodu złej przepustowości we / wy HDFS

tgbaggio
źródło
11

Nie grałem sam z tymi ustawieniami, więc to tylko spekulacje, ale jeśli myślimy o tym problemie jako o normalnych rdzeniach i wątkach w systemie rozproszonym, to w klastrze możesz użyć do 12 rdzeni (4 * 3 maszyn) i 24 wątków (8 * 3 maszyny). W pierwszych dwóch przykładach podajesz zadaniu sporo rdzeni (potencjalna przestrzeń obliczeniowa), ale liczba wątków (zadań) do uruchomienia na tych rdzeniach jest tak ograniczona, że ​​nie jesteś w stanie zużyć dużej ilości przydzielonej mocy obliczeniowej dlatego zadanie jest wolniejsze, mimo że przydzielono więcej zasobów obliczeniowych.

wspominasz, że twoja troska była na etapie tasowania - chociaż miło jest ograniczyć obciążenie ogólne na etapie tasowania, o wiele ważniejsze jest wykorzystanie równoległości klastra. Pomyśl o ekstremalnym przypadku - pojedynczym wątku z zerowym tasowaniem.

pwilmot
źródło
Dziękuję za odpowiedź. Ale podejrzewam, że liczba wątków nie jest głównym problemem. Dodałem zrzut ekranu monitorowania. Jak pokazuje wykres, 1) może zużywać tyle mocy procesora, ile zostało podane.
zeodtr
1
@zeodtr pwilmot jest poprawny - potrzebujesz 2-4 zadań MINIMUM, aby wykorzystać pełny potencjał swoich rdzeni. Ujmując to tak - zwykle używam co najmniej 1000 partycji dla mojego 80-rdzeniowego klastra.
samthebest
@samthebest To, co chcę wiedzieć, jest przyczyną różnicy wydajności między 1) a 3). Kiedy oglądam interfejs Spark, oba uruchamiają równolegle 21 zadań w sekcji 2. (dlaczego 21 zamiast 24 w przypadku 3) jest na razie nieznane). Jednak zadania dla 3) działają po prostu szybciej.
zeodtr
10

Krótka odpowiedź : myślę, że tgbaggio ma rację. Osiągnąłeś limity przepustowości HDFS na swoich programach wykonawczych.

Myślę, że odpowiedź tutaj może być nieco prostsza niż niektóre zalecenia tutaj.

Wskazówką jest dla mnie wykres sieci klastrów. Dla przebiegu 1 wykorzystanie jest stałe przy ~ 50 M bajtów / s. Dla przebiegu 3 stałe wykorzystanie jest podwojone, około 100 M bajtów / s.

Z postu na blogu cloudera udostępnionego przez DzOrd można zobaczyć ten ważny cytat:

Zauważyłem, że klient HDFS ma problem z mnóstwem jednoczesnych wątków. Z grubsza przypuszcza się, że maksymalnie pięć zadań na jednego executora może osiągnąć pełną przepustowość zapisu, więc dobrze jest utrzymać liczbę rdzeni na executorze poniżej tej liczby.

Zróbmy więc kilka obliczeń, aby zobaczyć, jakiej wydajności oczekujemy, jeśli to prawda.


Uruchom 1:19 GB, 7 rdzeni, 3 moduły wykonawcze

  • 3 executory x 7 wątków = 21 wątków
  • z 7 rdzeniami na moduł wykonujący, spodziewamy się ograniczonego wejścia / wyjścia do HDFS (maksimum przy ~ 5 rdzeniach)
  • efektywna przepustowość ~ = 3 executory x 5 wątków = 15 wątków

Uruchom 3: 4 GB, 2 rdzenie, 12 modułów wykonawczych

  • 2 executory x 12 wątków = 24 wątki
  • 2 rdzenie na executora, więc przepustowość hdfs jest w porządku
  • efektywna przepustowość ~ = 12 modułów wykonawczych x 2 wątki = 24 wątki

Jeśli zadanie jest w 100% ograniczone współbieżnością (liczbą wątków). Spodziewalibyśmy się, że środowisko wykonawcze będzie doskonale odwrotnie skorelowane z liczbą wątków.

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

Tak ratio_num_threads ~= inv_ratio_runtime, i wygląda na to, jesteśmy Network Limited.

Ten sam efekt wyjaśnia różnicę między Run 1 a Run 2.


Uruchom 2:19 GB, 4 rdzenie, 3 executory

  • 3 executory x 4 wątki = 12 wątków
  • z 4 rdzeniami na moduł wykonujący, ok IO do HDFS
  • efektywna przepustowość ~ = 3 executory x 4 wątki = 12 wątków

Porównanie liczby efektywnych wątków i środowiska wykonawczego:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

To nie jest tak doskonałe jak poprzednie porównanie, ale nadal widzimy podobny spadek wydajności, gdy tracimy wątki.

Teraz ostatni raz: dlaczego tak jest, że uzyskujemy lepszą wydajność przy większej liczbie wątków, szczególnie. więcej wątków niż liczba procesorów?

Dobre wyjaśnienie różnicy między paralelizmem (tym, co otrzymujemy dzieląc dane na wiele procesorów) a współbieżnością (tym, co otrzymujemy, gdy używamy wielu wątków do pracy na jednym procesorze) jest zawarte w tym świetnym poście Rob Pike: Concurrency nie jest paralelizmem .

Krótkie wyjaśnienie jest takie, że jeśli zadanie Spark wchodzi w interakcję z systemem plików lub siecią, procesor spędza dużo czasu czekając na komunikację z tymi interfejsami i nie spędzając dużo czasu na „pracy”. Przydzielając tym procesorom więcej niż 1 zadanie do pracy na raz, spędzają mniej czasu na czekaniu, a więcej na pracy, a Ty widzisz lepszą wydajność.

turtlemonvh
źródło
1
Ciekawe i przekonujące wyjaśnienie. Zastanawiam się, jak zgadłeś, że executor ma 5 zadań do osiągnięcia maksymalnej przepustowości.
Dat Nguyen,
Więc numer 5 nie jest czymś, co wymyśliłem: właśnie zauważyłem oznaki wąskiego gardła we IO i poszedłem szukać skąd te wąskie gardła.
turtlemonvh
8

Z doskonałych zasobów dostępnych na stronie pakietu Sparklyr RStudio :

DEFINICJE ŚWIECI :

Przydatne może być podanie kilku prostych definicji dla nomenklatury Spark:

Węzeł : serwer

Węzeł roboczy : serwer, który jest częścią klastra i jest dostępny do uruchamiania zadań Spark

Węzeł główny : serwer, który koordynuje węzły robocze.

Executor : Rodzaj wirtualnej maszyny w węźle. Jeden węzeł może mieć wielu wykonawców.

Węzeł sterownika : węzeł, który inicjuje sesję Spark. Zazwyczaj będzie to serwer, na którym znajduje się Sparklyr.

Driver (Executor) : Węzeł Driver pojawi się również na liście Executor.

d8aninja
źródło
1

Spark Dynamiczny przydział zapewnia elastyczność i dynamicznie przydziela zasoby. W tej liczbie minimalnych i maksymalnych wykonawców można podać. Można również podać liczbę programów wykonawczych, które należy uruchomić na początku aplikacji.

Przeczytaj poniżej o tym samym:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

Harikrishnan Ck
źródło
1

Myślę, że w pierwszych dwóch konfiguracjach jest mały problem. Pojęcia dotyczące wątków i rdzeni są następujące. Koncepcja wątków polega na tym, że jeśli rdzenie są idealne, użyj tego rdzenia do przetworzenia danych. Tak więc pamięć nie jest w pełni wykorzystana w pierwszych dwóch przypadkach. Jeśli chcesz porównać ten przykład, wybierz maszyny, które mają więcej niż 10 rdzeni na każdej maszynie. Następnie zrób ślad.

Ale nie dawaj więcej niż 5 rdzeni na executora, to będzie szyjka butelki na wydajności we / wy.

Tak więc najlepszymi maszynami do wykonania tego oznaczenia mogą być węzły danych, które mają 10 rdzeni.

Specyfikacja maszyny węzła danych: CPU: Core i7-4790 (liczba rdzeni: 10, liczba wątków: 20) RAM: 32 GB (8 GB x 4) HDD: 8 TB (2 TB x 4)

samotna gwiazda
źródło
0

Myślę, że jednym z głównych powodów jest lokalizacja. Rozmiar pliku wejściowego to 165G, bloki pokrewne pliku z pewnością są rozproszone w wielu węzłach DataNodes, więcej wykonawców może uniknąć kopiowania sieciowego.

Spróbuj ustawić liczbę wykonawców równą liczbę bloków, myślę, że może być szybszy.

zwb
źródło