Badam zachowanie Sparka, gdy dołączam do stołu do siebie. Używam Databricks.
Mój fikcyjny scenariusz to:
Odczytaj zewnętrzną tabelę jako ramkę danych A (pliki bazowe są w formacie delta)
Zdefiniuj ramkę danych B jako ramkę danych A z wybranymi tylko niektórymi kolumnami
Połącz ramki danych A i B w kolumnie 1 i kolumnie 2
(Tak, to nie ma większego sensu, po prostu eksperymentuję, aby zrozumieć mechanikę Sparka)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Moja pierwsza próba polegała na uruchomieniu kodu w obecnej postaci (próba 1). Następnie próbowałem partycjonować i buforować (próba 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
W końcu dokonałem podziału na partycje, posortowałem i zapisałem w pamięci podręcznej
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Odpowiednie wygenerowane dags są załączone.
Moje pytania to:
Dlaczego w próbie 1 tabela wydaje się buforowana, mimo że buforowanie nie zostało wyraźnie określone.
Dlaczego po InMemoreTableScan zawsze występuje inny węzeł tego typu.
Dlaczego w próbie 3 buforowanie wydaje się odbywać na dwóch etapach?
Dlaczego w próbie 3 WholeStageCodegen podąża za jednym (i tylko jednym) InMemoreTableScan.
Odpowiedzi:
To, co obserwujesz w tych 3 planach, to połączenie środowiska uruchomieniowego DataBricks i Spark.
Przede wszystkim podczas uruchamiania środowiska wykonawczego DataBricks 3.3+ buforowanie jest automatycznie włączane dla wszystkich plików parkietu. Odpowiednia konfiguracja do tego:
spark.databricks.io.cache.enabled true
W drugim zapytaniu InMemoryTableScan dzieje się dwa razy, ponieważ tuż po wywołaniu połączenia Spark próbował obliczyć zestaw danych A i zestaw danych B równolegle. Zakładając, że różnym wykonawcom przypisano powyższe zadania, oba będą musiały zeskanować tabelę z pamięci podręcznej (DataBricks).
Po trzecie, InMemoryTableScan nie odnosi się do buforowania jako takiego . Oznacza to po prostu, że niezależnie od tego, jaki utworzył się plan katalizatora, konieczne było wielokrotne skanowanie tabeli w pamięci podręcznej.
PS: Nie mogę sobie wyobrazić punktu 4 :)
źródło