Spark: UDF wykonywany wiele razy

9

Mam ramkę danych z następującym kodem:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Teraz sprawdzając dzienniki, dowiedziałem się, że dla każdego wiersza UDF jest wykonywany 3 razy. Jeśli dodam „test3” z kolumny „test.three”, UDF zostanie ponownie wykonany.

Czy ktoś może mi wyjaśnić, dlaczego?

Czy można tego właściwie uniknąć (bez buforowania ramki danych po dodaniu „testu”, nawet jeśli to działa)?

Rolintocour
źródło
Co masz na myśli? Wywołujesz funkcję testową trzy razy. Dlatego jest wykonywany trzy razy. Nie jestem pewien, dlaczego tworzysz UDF. Dlaczego po prostu nie sprawić, by mapa była wartością?
user4601931,
To tylko przykład pokazujący zachowanie iskry. Dla mnie „test” to nowa kolumna zawierająca strukturę, a następnie dostęp do dowolnej części struktury nie powinien ponownie uruchamiać UDF. Jak się mylę
Rolintocour,
Próbowałem wydrukować schemat, DataType „test” jest, Mapa nie Struct. Teraz zamiast zwracać Mapę, jeśli UDF zwraca klasę sprawy, taką jak Test (jeden Łańcuch, dwa: Łańcuch), wówczas testrzeczywiście jest Struktem, ale zawsze jest tyle wykonań UDF.
Rolintocour,
powiązane: stackoverflow.com/questions/40320563/…
Raphael Roth
buforowanie powinno działać zgodnie z następującą odpowiedzią: stackoverflow.com/a/40962714/1138523
Raphael Roth

Odpowiedzi:

5

Jeśli chcesz uniknąć wielu połączeń z udf (co jest przydatne, zwłaszcza jeśli udf jest wąskim gardłem w twojej pracy), możesz to zrobić w następujący sposób:

val testUDF = udf(test _).asNondeterministic()

Zasadniczo mówisz Sparkowi, że twoja funkcja nie jest deterministyczna, a teraz Spark upewnia się, że jest wywoływana tylko raz, ponieważ wielokrotne wywoływanie jej nie jest bezpieczne (każde wywołanie może zwrócić inny wynik).

Należy również pamiętać, że ta sztuczka nie jest za darmo, robiąc to, nakładasz pewne ograniczenia na optymalizator, jednym efektem ubocznym tego jest na przykład to, że optymalizator Spark nie przepycha filtrów przez wyrażenia, które nie są deterministyczne, więc stajesz się odpowiedzialny za optymalne pozycja filtrów w zapytaniu.

David Vrba
źródło
miły! ta odpowiedź również należy tutaj: stackoverflow.com/questions/40320563/...
Raphael Roth
W moim przypadku asNondeterministiczmusza UDF do wykonania tylko raz. Dzięki explode(array(myUdf($"id")))rozwiązaniu wciąż jest wykonywane dwukrotnie.
Rolintocour,