Apache Spark: wpływ repartycjonowania, sortowania i buforowania na złączenie

10

Badam zachowanie Sparka, gdy dołączam do stołu do siebie. Używam Databricks.

Mój fikcyjny scenariusz to:

  1. Odczytaj zewnętrzną tabelę jako ramkę danych A (pliki bazowe są w formacie delta)

  2. Zdefiniuj ramkę danych B jako ramkę danych A z wybranymi tylko niektórymi kolumnami

  3. Połącz ramki danych A i B w kolumnie 1 i kolumnie 2

(Tak, to nie ma większego sensu, po prostu eksperymentuję, aby zrozumieć mechanikę Sparka)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Moja pierwsza próba polegała na uruchomieniu kodu w obecnej postaci (próba 1). Następnie próbowałem partycjonować i buforować (próba 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

W końcu dokonałem podziału na partycje, posortowałem i zapisałem w pamięci podręcznej

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Odpowiednie wygenerowane dags są załączone.

Moje pytania to:

  1. Dlaczego w próbie 1 tabela wydaje się buforowana, mimo że buforowanie nie zostało wyraźnie określone.

  2. Dlaczego po InMemoreTableScan zawsze występuje inny węzeł tego typu.

  3. Dlaczego w próbie 3 buforowanie wydaje się odbywać na dwóch etapach?

  4. Dlaczego w próbie 3 WholeStageCodegen podąża za jednym (i tylko jednym) InMemoreTableScan.

próba 1

próba 2

wprowadź opis zdjęcia tutaj

Dawid
źródło
Podejrzewam, że czytnik DataFrame automatycznie buforuje dane, gdy źródłem jest tabela zewnętrzna. Mam podobną sytuację, gdy czytam dane z tabeli bazy danych, podczas gdy można pobrać kartę „SQL” w interfejsie użytkownika „Szczegóły aplikacji” pokazuje liczbę pobieranych wierszy, ale żaden plik nie został jeszcze zapisany w określonej lokalizacji . Myślę, że zna liczbę, ponieważ gdzieś buforuje dane i to właśnie pojawia się w DAG. Jeśli odczytujesz dane z pliku tekstowego lokalnie, nie zobaczysz stanu pamięci podręcznej.
Salim

Odpowiedzi:

4

To, co obserwujesz w tych 3 planach, to połączenie środowiska uruchomieniowego DataBricks i Spark.

Przede wszystkim podczas uruchamiania środowiska wykonawczego DataBricks 3.3+ buforowanie jest automatycznie włączane dla wszystkich plików parkietu. Odpowiednia konfiguracja do tego: spark.databricks.io.cache.enabled true

W drugim zapytaniu InMemoryTableScan dzieje się dwa razy, ponieważ tuż po wywołaniu połączenia Spark próbował obliczyć zestaw danych A i zestaw danych B równolegle. Zakładając, że różnym wykonawcom przypisano powyższe zadania, oba będą musiały zeskanować tabelę z pamięci podręcznej (DataBricks).

Po trzecie, InMemoryTableScan nie odnosi się do buforowania jako takiego . Oznacza to po prostu, że niezależnie od tego, jaki utworzył się plan katalizatora, konieczne było wielokrotne skanowanie tabeli w pamięci podręcznej.

PS: Nie mogę sobie wyobrazić punktu 4 :)

Ashvjit Singh
źródło