Zakleszczenie, gdy wiele zadań iskrowych jest jednocześnie zaplanowanych

17

Używanie Spark 2.4.4 działającej w trybie klastra YARN z iskrowym harmonogramem FIFO.

Przesyłam wiele operacji ramki danych Spark (tj. Zapisuję dane do S3) przy użyciu modułu wykonującego pulę wątków o zmiennej liczbie wątków. Działa to dobrze, jeśli mam ~ 10 wątków, ale jeśli użyję setek wątków, wydaje się, że jest impas, a żadne zadania nie są planowane zgodnie z interfejsem Spark.

Jakie czynniki kontrolują, ile zadań można zaplanować jednocześnie? Zasoby sterownika (np. Pamięć / rdzenie)? Jakieś inne ustawienia konfiguracji iskier?

EDYTOWAĆ:

Oto krótkie streszczenie mojego kodu

ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);

Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);

List<Future<Void>> futures = listOfSeveralHundredThings
  .stream()
  .map(aThing -> ecs.submit(() -> {
    df
      .filter(col("some_column").equalTo(aThing))
      .write()
      .format("org.apache.hudi")
      .options(writeOptions)
      .save(outputPathFor(aThing));
    return null;
  }))
  .collect(Collectors.toList());

IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();

W pewnym momencie, wraz ze nThreadswzrostem, iskra nie wydaje się już planować żadnych zadań, o czym świadczą:

  • ecs.poll(...) upłynął limit czasu
  • Karta Zadania interfejsu użytkownika Spark pokazuje brak aktywnych zadań
  • Karta executorów interfejsu użytkownika Spark pokazująca brak aktywnych zadań dla dowolnego modułu wykonującego
  • Karta SQL interfejsu użytkownika Spark wyświetlająca nThreadsuruchomione zapytania bez uruchomionych identyfikatorów zadań

Moje środowisko wykonawcze to

  • AWS EMR 5.28.1
  • Spark 2.4.4
  • Węzeł główny = m5.4xlarge
  • Węzły rdzenia = 3x rd5.24xlarge
  • spark.driver.cores=24
  • spark.driver.memory=32g
  • spark.executor.memory=21g
  • spark.scheduler.mode=FIFO
Scott
źródło
Czy jest jakaś konkretna sekcja, która to omawia? Czytałem te dokumenty kilka razy w ciągu ostatnich kilku dni i nie znalazłem odpowiedzi, której szukam.
Scott
2
Czy możesz pokazać kod, którego używasz do przesyłania zadań Spark za pośrednictwem executora puli wątków? Wygląda na to, że impas ma miejsce przed przesłaniem zadania Spark.
Salim
1
Czy możesz opublikować swój kod? Podaj szczegółowe informacje na temat swojej env: procesor, pamięć RAM; także w jaki sposób tworzysz wątki: jednocześnie lub w małych grupach po 10 osób?
Saheed
Przepraszam, co masz na myśli, że prace nie są zaplanowane? Nie pojawiają się w interfejsie Spark lub pojawiają się na liście zadań, ale zadania się nie wykonują? Tak czy inaczej, jeśli podejrzewasz impas, biegnij, jstack -laby uzyskać zrzut wątku z informacjami o blokowaniu.
Daniel Darabos

Odpowiedzi:

0

Jeśli to możliwe, zapisz dane wyjściowe zadań do AWS Elastic MapReduce hdfs (aby wykorzystać niemal natychmiastowe zmiany nazw i lepsze IO plików lokalnych hdfs) i dodaj krok dstcp, aby przenieść pliki do S3, aby zaoszczędzić sobie wszelkich kłopotów z obsługą wnętrze magazynu obiektów próbującego być systemem plików. Również pisanie na lokalnych plikach hdfs umożliwi spekulacje, aby kontrolować niekontrolowane zadania bez wpadania w pułapki impasu związane z DirectOutputCommiter.

Jeśli musisz użyć S3 jako katalogu wyjściowego, upewnij się, że ustawione są następujące konfiguracje Spark

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Uwaga: DirectParquetOutputCommitter został usunięty ze Spark 2.0 ze względu na możliwość utraty danych. Niestety, dopóki nie poprawimy spójności z S3a, musimy pracować z obejściami. Dzięki Hadoop 2.8 wszystko się poprawia

Unikaj nazw kluczy w porządku leksykograficznym. Do poruszania się można użyć haszujących / losowych prefiksów lub odwrócić datę i godzinę. Sztuczka polega na hierarchicznym nazywaniu kluczy, umieszczaniu najczęstszych rzeczy po filtrach po lewej stronie klucza. I nigdy nie używaj podkreśleń w nazwach segmentów z powodu problemów z DNS.

Włączanie fs.s3a.fast.upload uploadczęści jednego pliku do Amazon S3 równolegle

Zapoznaj się z tymi artykułami, aby uzyskać więcej szczegółów

Ustawienie spark.speculation w Spark 2.1.0 podczas pisania do s3

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Devesh mehta
źródło
AWS ma własny dokument docitteraws.ams.amazon.com/emr/latest/ReleaseGuide/…
mazaneicha
0

IMO prawdopodobnie źle podchodzisz do tego problemu. O ile nie możesz zagwarantować, że liczba zadań na zadanie jest bardzo niska, prawdopodobnie nie osiągniesz znacznej poprawy wydajności poprzez równoległe wykonanie setek zadań jednocześnie. Twój klaster może obsłużyć tylko 300 zadań jednocześnie, zakładając, że używasz domyślnej równoległości 200, czyli tylko 1,5 zadania. Sugeruję przepisanie kodu, aby ograniczyć maksymalną liczbę równoczesnych zapytań o wartości 10. Podejrzewam, że masz 300 zapytań z uruchomionym tylko pojedynczym zadaniem kilkuset. Z tego powodu większość systemów przetwarzania danych OLTP celowo ma dość niski poziom współbieżnych zapytań w porównaniu do bardziej tradycyjnych systemów RDS.

również

  1. Apache Hudi ma domyślną równoległość kilkuset FYI.
  2. Dlaczego po prostu nie partycjonujesz na podstawie kolumny filtra?
Andrew Long
źródło