Mam duży zestaw danych, który muszę podzielić na grupy zgodnie z określonymi parametrami. Chcę, aby zadanie przebiegło tak wydajnie, jak to możliwe. Mogę sobie wyobrazić dwa sposoby
Opcja 1 - Utwórz mapę z oryginalnego RDD i filtra
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Opcja 2 - Filtruj bezpośrednio oryginalny RDD
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
Metoda pięści musi powtarzać wszystkie rekordy oryginalnego zestawu danych 3 razy, przy czym drugi musi to zrobić tylko dwa razy, jednak w normalnych okolicznościach iskra robi pewne zakulisowe tworzenie wykresu, więc mogłem sobie wyobrazić, że są skutecznie zrobione w ten sam sposób. Moje pytania są następujące: a.) Czy jedna metoda jest bardziej wydajna od drugiej, czy też budowanie wykresu iskierkowego czyni je równoważnymi b.) Czy można to zrobić w jednym przebiegu
apache-spark
pyspark
jagartner
źródło
źródło
Odpowiedzi:
Przede wszystkim powiem ci, że nie jestem ekspertem od Spark; Używam go dość często w ciągu ostatnich kilku miesięcy i myślę, że teraz to rozumiem, ale mogę się mylić.
Odpowiadając na pytania:
a.) są równoważne, ale nie w taki sposób, w jaki je widzisz; Jeśli zastanawiasz się, Spark nie zoptymalizuje wykresu, ale
customMapper
w obu przypadkach nadal będzie wykonywany dwukrotnie; wynika to z faktu, że w przypadku iskry,rdd1
irdd2
są to dwa całkowicie różne RDD, i zbuduje wykres transformacji oddolny, zaczynając od liści; więc opcja 1 przełoży się na:Jak powiedziałeś,
customMapper
jest wykonywany dwa razy (ponadtorddIn
będzie również czytany dwukrotnie, co oznacza, że jeśli pochodzi z bazy danych, może być jeszcze wolniejszy).b.) istnieje sposób, musisz po prostu przenieść się
cache()
w odpowiednie miejsce:Robiąc to, mówimy iskrze, że może przechowywać częściowe wyniki
mappedRdd
; użyje tych częściowych wyników zarówno dla, jakrdd1
i dlardd2
. Z iskrowego punktu widzenia jest to równoważne z:źródło