Mam ramkę danych z następującym kodem:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Teraz sprawdzając dzienniki, dowiedziałem się, że dla każdego wiersza UDF jest wykonywany 3 razy. Jeśli dodam „test3” z kolumny „test.three”, UDF zostanie ponownie wykonany.
Czy ktoś może mi wyjaśnić, dlaczego?
Czy można tego właściwie uniknąć (bez buforowania ramki danych po dodaniu „testu”, nawet jeśli to działa)?
scala
apache-spark
apache-spark-sql
Rolintocour
źródło
źródło
Map
a nie Struct. Teraz zamiast zwracać Mapę, jeśli UDF zwraca klasę sprawy, taką jak Test (jeden Łańcuch, dwa: Łańcuch), wówczastest
rzeczywiście jest Struktem, ale zawsze jest tyle wykonań UDF.Odpowiedzi:
Jeśli chcesz uniknąć wielu połączeń z udf (co jest przydatne, zwłaszcza jeśli udf jest wąskim gardłem w twojej pracy), możesz to zrobić w następujący sposób:
Zasadniczo mówisz Sparkowi, że twoja funkcja nie jest deterministyczna, a teraz Spark upewnia się, że jest wywoływana tylko raz, ponieważ wielokrotne wywoływanie jej nie jest bezpieczne (każde wywołanie może zwrócić inny wynik).
Należy również pamiętać, że ta sztuczka nie jest za darmo, robiąc to, nakładasz pewne ograniczenia na optymalizator, jednym efektem ubocznym tego jest na przykład to, że optymalizator Spark nie przepycha filtrów przez wyrażenia, które nie są deterministyczne, więc stajesz się odpowiedzialny za optymalne pozycja filtrów w zapytaniu.
źródło
asNondeterministic
zmusza UDF do wykonania tylko raz. Dziękiexplode(array(myUdf($"id")))
rozwiązaniu wciąż jest wykonywane dwukrotnie.