Używanie Spark 2.4.4 działającej w trybie klastra YARN z iskrowym harmonogramem FIFO.
Przesyłam wiele operacji ramki danych Spark (tj. Zapisuję dane do S3) przy użyciu modułu wykonującego pulę wątków o zmiennej liczbie wątków. Działa to dobrze, jeśli mam ~ 10 wątków, ale jeśli użyję setek wątków, wydaje się, że jest impas, a żadne zadania nie są planowane zgodnie z interfejsem Spark.
Jakie czynniki kontrolują, ile zadań można zaplanować jednocześnie? Zasoby sterownika (np. Pamięć / rdzenie)? Jakieś inne ustawienia konfiguracji iskier?
EDYTOWAĆ:
Oto krótkie streszczenie mojego kodu
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
W pewnym momencie, wraz ze nThreads
wzrostem, iskra nie wydaje się już planować żadnych zadań, o czym świadczą:
ecs.poll(...)
upłynął limit czasu- Karta Zadania interfejsu użytkownika Spark pokazuje brak aktywnych zadań
- Karta executorów interfejsu użytkownika Spark pokazująca brak aktywnych zadań dla dowolnego modułu wykonującego
- Karta SQL interfejsu użytkownika Spark wyświetlająca
nThreads
uruchomione zapytania bez uruchomionych identyfikatorów zadań
Moje środowisko wykonawcze to
- AWS EMR 5.28.1
- Spark 2.4.4
- Węzeł główny =
m5.4xlarge
- Węzły rdzenia = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
apache-spark
Scott
źródło
źródło
jstack -l
aby uzyskać zrzut wątku z informacjami o blokowaniu.Odpowiedzi:
Jeśli to możliwe, zapisz dane wyjściowe zadań do AWS Elastic MapReduce hdfs (aby wykorzystać niemal natychmiastowe zmiany nazw i lepsze IO plików lokalnych hdfs) i dodaj krok dstcp, aby przenieść pliki do S3, aby zaoszczędzić sobie wszelkich kłopotów z obsługą wnętrze magazynu obiektów próbującego być systemem plików. Również pisanie na lokalnych plikach hdfs umożliwi spekulacje, aby kontrolować niekontrolowane zadania bez wpadania w pułapki impasu związane z DirectOutputCommiter.
Jeśli musisz użyć S3 jako katalogu wyjściowego, upewnij się, że ustawione są następujące konfiguracje Spark
Uwaga: DirectParquetOutputCommitter został usunięty ze Spark 2.0 ze względu na możliwość utraty danych. Niestety, dopóki nie poprawimy spójności z S3a, musimy pracować z obejściami. Dzięki Hadoop 2.8 wszystko się poprawia
Unikaj nazw kluczy w porządku leksykograficznym. Do poruszania się można użyć haszujących / losowych prefiksów lub odwrócić datę i godzinę. Sztuczka polega na hierarchicznym nazywaniu kluczy, umieszczaniu najczęstszych rzeczy po filtrach po lewej stronie klucza. I nigdy nie używaj podkreśleń w nazwach segmentów z powodu problemów z DNS.
Włączanie
fs.s3a.fast.upload upload
części jednego pliku do Amazon S3 równolegleZapoznaj się z tymi artykułami, aby uzyskać więcej szczegółów
Ustawienie spark.speculation w Spark 2.1.0 podczas pisania do s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
źródło
IMO prawdopodobnie źle podchodzisz do tego problemu. O ile nie możesz zagwarantować, że liczba zadań na zadanie jest bardzo niska, prawdopodobnie nie osiągniesz znacznej poprawy wydajności poprzez równoległe wykonanie setek zadań jednocześnie. Twój klaster może obsłużyć tylko 300 zadań jednocześnie, zakładając, że używasz domyślnej równoległości 200, czyli tylko 1,5 zadania. Sugeruję przepisanie kodu, aby ograniczyć maksymalną liczbę równoczesnych zapytań o wartości 10. Podejrzewam, że masz 300 zapytań z uruchomionym tylko pojedynczym zadaniem kilkuset. Z tego powodu większość systemów przetwarzania danych OLTP celowo ma dość niski poziom współbieżnych zapytań w porównaniu do bardziej tradycyjnych systemów RDS.
również
źródło