Zacząłem używać Spark SQL i DataFrames w Spark 1.4.0. Chcę zdefiniować niestandardowy partycjoner w DataFrames w Scali, ale nie widzę, jak to zrobić.
Jedna z tabel danych, z którymi pracuję, zawiera listę transakcji według konta, jak w poniższym przykładzie.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Przynajmniej na początku większość obliczeń będzie miała miejsce między transakcjami na koncie. Dlatego chciałbym, aby dane zostały podzielone na partycje, aby wszystkie transakcje dla konta znajdowały się na tej samej partycji Spark.
Ale nie widzę sposobu, aby to zdefiniować. Klasa DataFrame ma metodę o nazwie „repartition (Int)”, w której można określić liczbę partycji do utworzenia. Ale nie widzę żadnej dostępnej metody definiowania niestandardowego partycjonera dla DataFrame, takiej jak można określić dla RDD.
Dane źródłowe są przechowywane w Parquet. Widziałem, że pisząc DataFrame do Parquet, możesz określić kolumnę, według której chcesz podzielić, więc prawdopodobnie mógłbym powiedzieć Parquetowi, aby podzielił dane według kolumny „Konto”. Ale mogą istnieć miliony kont, a jeśli dobrze rozumiem Parquet, utworzy oddzielny katalog dla każdego konta, więc nie brzmiało to jak rozsądne rozwiązanie.
Czy istnieje sposób, aby Spark podzielił ten DataFrame, tak aby wszystkie dane konta znajdowały się na tej samej partycji?
int(account/someInteger)
i w ten sposób uzyskać rozsądną liczbę kont na katalog.partitionBy(Partitioner)
metody, ale DataFrames zamiast RDD. Teraz widzę, żepartitionBy
jest to dostępne tylko dla par RDD, nie wiem, dlaczego tak jest.Odpowiedzi:
Iskra> = 2.3.0
SPARK-22614 ujawnia podział zakresu.
SPARK-22389 ujawnia zewnętrzne partycjonowanie formatu w Data Source API v2 .
Iskra> = 1.6.0
W Spark> = 1.6 możliwe jest użycie partycjonowania według kolumn do zapytań i buforowania. Widzieć: SPARK-11410 i zapłonem 4849 stosując
repartition
metodę:W przeciwieństwie do
RDDs
SparkDataset
(w tymDataset[Row]
akaDataFrame
) nie można teraz używać niestandardowego partycjonera. Zazwyczaj można temu zaradzić, tworząc sztuczną kolumnę partycjonowania, ale nie zapewni to takiej samej elastyczności.Iskra <1.6.0:
Jedną rzeczą, którą możesz zrobić, jest wstępne podzielenie danych wejściowych na partycje przed utworzeniem pliku
DataFrame
Ponieważ
DataFrame
tworzenie z poziomuRDD
wymaga tylko prostej fazy mapy, należy zachować istniejący układ partycji *:W ten sam sposób możesz ponownie podzielić istniejące
DataFrame
:Więc wygląda na to, że nie jest to niemożliwe. Pozostaje pytanie, czy w ogóle ma to sens. Będę argumentował, że w większości przypadków tak nie jest:
Zmiana partycji to kosztowny proces. W typowym scenariuszu większość danych musi być serializowana, tasowana i deserializowana. Z drugiej strony liczba operacji, które mogą skorzystać na danych wstępnie podzielonych na partycje, jest stosunkowo niewielka i jest dodatkowo ograniczona, jeśli wewnętrzny interfejs API nie jest zaprojektowany do wykorzystania tej właściwości.
GROUP BY
- możliwe jest zmniejszenie rozmiaru pamięci tymczasowych buforów **, ale ogólny koszt jest znacznie wyższy. Mniej więcej odpowiednikgroupByKey.mapValues(_.reduce)
(obecne zachowanie) vsreduceByKey
(wstępne partycjonowanie). W praktyce mało prawdopodobne.SqlContext.cacheTable
. Ponieważ wygląda na to, że używa kodowania długości serii, zastosowanieOrderedRDDFunctions.repartitionAndSortWithinPartitions
może poprawić współczynnik kompresji.Wydajność w dużym stopniu zależy od dystrybucji kluczy. Jeśli jest przekrzywiony, spowoduje to nieoptymalne wykorzystanie zasobów. W najgorszym przypadku ukończenie pracy w ogóle nie będzie możliwe.
Pojęcia pokrewne
Partycjonowanie ze źródłami JDBC :
Źródła danych JDBC obsługują
predicates
argument . Można go używać w następujący sposób:Tworzy jedną partycję JDBC na predykat. Pamiętaj, że jeśli zestawy utworzone przy użyciu indywidualnych predykatów nie są rozłączne, w wynikowej tabeli zobaczysz duplikaty.
partitionBy
metoda wDataFrameWriter
:Spark
DataFrameWriter
udostępniapartitionBy
metodę, której można użyć do „partycjonowania” danych podczas zapisu. Oddziela dane przy zapisie za pomocą dostarczonego zestawu kolumnUmożliwia to przesunięcie predykatu w dół podczas odczytu dla zapytań opartych na kluczu:
ale to nie jest równoważne
DataFrame.repartition
. W szczególności agregacje takie jak:nadal będzie wymagać
TungstenExchange
:bucketBy
metoda wDataFrameWriter
(Spark> = 2.0):bucketBy
ma podobne aplikacje,partitionBy
ale jest dostępny tylko dla tabel (saveAsTable
). Informacje o zasobnikach można wykorzystać do optymalizacji sprzężeń:* Przez układ partycji mam na myśli tylko dystrybucję danych.
partitioned
RDD nie ma już programu do partycjonowania. ** Zakładając brak wczesnej prognozy. Jeśli agregacja obejmuje tylko niewielki podzbiór kolumn, prawdopodobnie nie ma żadnego zysku.źródło
DataFrameWriter.partitionBy
to nie to samo coDataFrame.repartition
. Dawniej nie tasuje, po prostu oddziela dane wyjściowe. Odnośnie pierwszego pytania. - dane są zapisywane na partycję i nie ma tasowania. Możesz to łatwo sprawdzić, czytając poszczególne pliki. Ale sam Spark nie może się o tym dowiedzieć, jeśli tego naprawdę chcesz.W Spark <1.6 Jeśli utworzysz a
HiveContext
, a nie zwykły starySqlContext
, możesz użyć HiveQLDISTRIBUTE BY colX...
(zapewnia, że każdy z N reduktorów otrzyma nienakładające się zakresy x) &CLUSTER BY colX...
(skrót do Rozłóż według i Sortuj według) na przykład;Nie wiem, jak to pasuje do interfejsu API Spark DF. Te słowa kluczowe nie są obsługiwane w normalnym SqlContext (pamiętaj, że nie musisz mieć meta magazynu hive, aby używać HiveContext)
EDYCJA: Spark 1.6+ ma teraz to w natywnym interfejsie API DataFrame
źródło
Zacznijmy więc od jakiejś odpowiedzi:) - Nie możesz
Nie jestem ekspertem, ale o ile rozumiem DataFrames, nie są one równe rdd, a DataFrame nie ma czegoś takiego jak Partitioner.
Generalnie idea DataFrame polega na zapewnieniu innego poziomu abstrakcji, który sam rozwiązuje takie problemy. Zapytania w DataFrame są tłumaczone na plan logiczny, który jest następnie przekładany na operacje na RDD. Proponowane partycjonowanie prawdopodobnie zostanie zastosowane automatycznie lub przynajmniej powinno.
Jeśli nie ufasz SparkSQL, że zapewni on jakąś optymalną pracę, zawsze możesz przekształcić DataFrame do RDD [Row], zgodnie z sugestią w komentarzach.
źródło
Użyj ramki DataFrame zwróconej przez:
Nie ma wyraźnego sposobu użycia
partitionBy
w DataFrame, tylko na PairRDD, ale kiedy posortujesz DataFrame, użyje go w swoim LogicalPlan, co pomoże, gdy będziesz musiał wykonać obliczenia na każdym koncie.Właśnie natknąłem się na ten sam dokładny problem, z ramką danych, którą chcę podzielić według konta. Zakładam, że kiedy mówisz „chcesz podzielić dane na partycje, aby wszystkie transakcje dla konta znajdowały się na tej samej partycji Spark”, chcesz, aby była to skalowalność i wydajność, ale Twój kod nie zależy od tego (np.
mapPartitions()
itp.), prawda?źródło
Udało mi się to zrobić za pomocą RDD. Ale nie wiem, czy to jest dla ciebie akceptowalne rozwiązanie. Gdy masz już DF dostępny jako RDD, możesz złożyć wniosek
repartitionAndSortWithinPartitions
o dokonanie niestandardowego ponownego podziału danych.Oto próbka, której użyłem:
źródło