Jak zdefiniować partycjonowanie DataFrame?

128

Zacząłem używać Spark SQL i DataFrames w Spark 1.4.0. Chcę zdefiniować niestandardowy partycjoner w DataFrames w Scali, ale nie widzę, jak to zrobić.

Jedna z tabel danych, z którymi pracuję, zawiera listę transakcji według konta, jak w poniższym przykładzie.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Przynajmniej na początku większość obliczeń będzie miała miejsce między transakcjami na koncie. Dlatego chciałbym, aby dane zostały podzielone na partycje, aby wszystkie transakcje dla konta znajdowały się na tej samej partycji Spark.

Ale nie widzę sposobu, aby to zdefiniować. Klasa DataFrame ma metodę o nazwie „repartition (Int)”, w której można określić liczbę partycji do utworzenia. Ale nie widzę żadnej dostępnej metody definiowania niestandardowego partycjonera dla DataFrame, takiej jak można określić dla RDD.

Dane źródłowe są przechowywane w Parquet. Widziałem, że pisząc DataFrame do Parquet, możesz określić kolumnę, według której chcesz podzielić, więc prawdopodobnie mógłbym powiedzieć Parquetowi, aby podzielił dane według kolumny „Konto”. Ale mogą istnieć miliony kont, a jeśli dobrze rozumiem Parquet, utworzy oddzielny katalog dla każdego konta, więc nie brzmiało to jak rozsądne rozwiązanie.

Czy istnieje sposób, aby Spark podzielił ten DataFrame, tak aby wszystkie dane konta znajdowały się na tej samej partycji?

grabie
źródło
sprawdź ten link stackoverflow.com/questions/23127329/…
Abhishek Choudhary
Jeśli możesz nakazać Parquetowi partycjonowanie według konta, prawdopodobnie możesz podzielić według int(account/someInteger)i w ten sposób uzyskać rozsądną liczbę kont na katalog.
Paul
1
@ABC: Widziałem ten link. Szukałem odpowiednika tej partitionBy(Partitioner)metody, ale DataFrames zamiast RDD. Teraz widzę, że partitionByjest to dostępne tylko dla par RDD, nie wiem, dlaczego tak jest.
rake
@Paul: Rozważałem zrobienie tego, co opisujesz. Kilka rzeczy mnie powstrzymało:
rake
ciąg dalszy .... (1) To jest dla "podziału na parkiety". Nie udało mi się znaleźć żadnych dokumentów, które stwierdzają, że partycjonowanie Spark faktycznie będzie używać partycjonowania Parquet. (2) Jeśli rozumiem dokumentację Parquet, muszę zdefiniować nowe pole „foo”, wtedy każdy katalog Parquet miałby nazwę taką jak „foo = 123”. Ale jeśli skonstruuję zapytanie z udziałem AccountID , w jaki sposób Spark / hive / parquet będzie wiedział, że istnieje jakieś powiązanie między foo i AccountID ?
prowizja

Odpowiedzi:

177

Iskra> = 2.3.0

SPARK-22614 ujawnia podział zakresu.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 ujawnia zewnętrzne partycjonowanie formatu w Data Source API v2 .

Iskra> = 1.6.0

W Spark> = 1.6 możliwe jest użycie partycjonowania według kolumn do zapytań i buforowania. Widzieć: SPARK-11410 i zapłonem 4849 stosując repartitionmetodę:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

W przeciwieństwie do RDDsSpark Dataset(w tym Dataset[Row]aka DataFrame) nie można teraz używać niestandardowego partycjonera. Zazwyczaj można temu zaradzić, tworząc sztuczną kolumnę partycjonowania, ale nie zapewni to takiej samej elastyczności.

Iskra <1.6.0:

Jedną rzeczą, którą możesz zrobić, jest wstępne podzielenie danych wejściowych na partycje przed utworzeniem pliku DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Ponieważ DataFrametworzenie z poziomu RDDwymaga tylko prostej fazy mapy, należy zachować istniejący układ partycji *:

assert(df.rdd.partitions == partitioned.partitions)

W ten sam sposób możesz ponownie podzielić istniejące DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Więc wygląda na to, że nie jest to niemożliwe. Pozostaje pytanie, czy w ogóle ma to sens. Będę argumentował, że w większości przypadków tak nie jest:

  1. Zmiana partycji to kosztowny proces. W typowym scenariuszu większość danych musi być serializowana, tasowana i deserializowana. Z drugiej strony liczba operacji, które mogą skorzystać na danych wstępnie podzielonych na partycje, jest stosunkowo niewielka i jest dodatkowo ograniczona, jeśli wewnętrzny interfejs API nie jest zaprojektowany do wykorzystania tej właściwości.

    • łączy się w niektórych scenariuszach, ale wymagałoby wewnętrznego wsparcia,
    • wywołania funkcji okna z odpowiednim partycjonerem. Tak samo jak powyżej, ograniczone do definicji pojedynczego okna. Jest już jednak podzielony wewnętrznie, więc wstępne partycjonowanie może być zbędne,
    • proste agregacje z GROUP BY- możliwe jest zmniejszenie rozmiaru pamięci tymczasowych buforów **, ale ogólny koszt jest znacznie wyższy. Mniej więcej odpowiednik groupByKey.mapValues(_.reduce)(obecne zachowanie) vs reduceByKey(wstępne partycjonowanie). W praktyce mało prawdopodobne.
    • kompresja danych z SqlContext.cacheTable. Ponieważ wygląda na to, że używa kodowania długości serii, zastosowanie OrderedRDDFunctions.repartitionAndSortWithinPartitionsmoże poprawić współczynnik kompresji.
  2. Wydajność w dużym stopniu zależy od dystrybucji kluczy. Jeśli jest przekrzywiony, spowoduje to nieoptymalne wykorzystanie zasobów. W najgorszym przypadku ukończenie pracy w ogóle nie będzie możliwe.

  3. Celem używania deklaratywnego interfejsu API wysokiego poziomu jest odizolowanie się od szczegółów implementacji niskiego poziomu. Jak już wspomnieli @dwysakowicz i @RomiKuntsman , optymalizacja jest zadaniem Catalyst Optimizer . Jest to dość wyrafinowana bestia i naprawdę wątpię, by można było ją łatwo poprawić bez głębszego zanurzenia się w jej wnętrzności.

Pojęcia pokrewne

Partycjonowanie ze źródłami JDBC :

Źródła danych JDBC obsługują predicatesargument . Można go używać w następujący sposób:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Tworzy jedną partycję JDBC na predykat. Pamiętaj, że jeśli zestawy utworzone przy użyciu indywidualnych predykatów nie są rozłączne, w wynikowej tabeli zobaczysz duplikaty.

partitionBy metoda w DataFrameWriter :

Spark DataFrameWriterudostępnia partitionBymetodę, której można użyć do „partycjonowania” danych podczas zapisu. Oddziela dane przy zapisie za pomocą dostarczonego zestawu kolumn

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Umożliwia to przesunięcie predykatu w dół podczas odczytu dla zapytań opartych na kluczu:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

ale to nie jest równoważne DataFrame.repartition. W szczególności agregacje takie jak:

val cnts = df1.groupBy($"k").sum()

nadal będzie wymagać TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBymetoda wDataFrameWriter (Spark> = 2.0):

bucketByma podobne aplikacje, partitionByale jest dostępny tylko dla tabel ( saveAsTable). Informacje o zasobnikach można wykorzystać do optymalizacji sprzężeń:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Przez układ partycji mam na myśli tylko dystrybucję danych. partitionedRDD nie ma już programu do partycjonowania. ** Zakładając brak wczesnej prognozy. Jeśli agregacja obejmuje tylko niewielki podzbiór kolumn, prawdopodobnie nie ma żadnego zysku.

zero323
źródło
@bychance Yes and no. Układ danych zostanie zachowany, ale ODPOWIEDŹ nie da ci korzyści, takich jak przycinanie partycji.
zero323
@ zero323 Dzięki, czy istnieje sposób na sprawdzenie alokacji partycji w pliku parkietu, aby sprawdzić, czy df.save.write rzeczywiście zapisze układ? A jeśli zrobię df.repartition („A”), a następnie df.write.repartitionBy („B”), struktura folderów fizycznych zostanie podzielona na partycje według B i czy w każdym folderze wartości B nadal zachowa partycję ZA?
bychance
2
@bychance DataFrameWriter.partitionByto nie to samo co DataFrame.repartition. Dawniej nie tasuje, po prostu oddziela dane wyjściowe. Odnośnie pierwszego pytania. - dane są zapisywane na partycję i nie ma tasowania. Możesz to łatwo sprawdzić, czytając poszczególne pliki. Ale sam Spark nie może się o tym dowiedzieć, jeśli tego naprawdę chcesz.
zero323
11

W Spark <1.6 Jeśli utworzysz a HiveContext, a nie zwykły stary SqlContext, możesz użyć HiveQL DISTRIBUTE BY colX... (zapewnia, że ​​każdy z N reduktorów otrzyma nienakładające się zakresy x) & CLUSTER BY colX...(skrót do Rozłóż według i Sortuj według) na przykład;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Nie wiem, jak to pasuje do interfejsu API Spark DF. Te słowa kluczowe nie są obsługiwane w normalnym SqlContext (pamiętaj, że nie musisz mieć meta magazynu hive, aby używać HiveContext)

EDYCJA: Spark 1.6+ ma teraz to w natywnym interfejsie API DataFrame

Nocny Wilk
źródło
1
Czy partycje są zachowywane podczas zapisywania ramki danych?
Sim
Jak kontrolujesz liczbę partycji, które możesz mieć w przykładzie ql gałęzi? na przykład w podejściu para RDD, można to zrobić, aby utworzyć partycje 5: val partycjonujący = new HashPartitioner (5)
Minnie
ok, znalazłem odpowiedź, da się to zrobić w ten sposób: sqlContext.setConf ("spark.sql.shuffle.partitions", "5") Nie mogłem edytować poprzedniego komentarza, ponieważ przegapiłem 5 minut
Minnie
7

Zacznijmy więc od jakiejś odpowiedzi:) - Nie możesz

Nie jestem ekspertem, ale o ile rozumiem DataFrames, nie są one równe rdd, a DataFrame nie ma czegoś takiego jak Partitioner.

Generalnie idea DataFrame polega na zapewnieniu innego poziomu abstrakcji, który sam rozwiązuje takie problemy. Zapytania w DataFrame są tłumaczone na plan logiczny, który jest następnie przekładany na operacje na RDD. Proponowane partycjonowanie prawdopodobnie zostanie zastosowane automatycznie lub przynajmniej powinno.

Jeśli nie ufasz SparkSQL, że zapewni on jakąś optymalną pracę, zawsze możesz przekształcić DataFrame do RDD [Row], zgodnie z sugestią w komentarzach.

Dawid Wysakowicz
źródło
7

Użyj ramki DataFrame zwróconej przez:

yourDF.orderBy(account)

Nie ma wyraźnego sposobu użycia partitionBy w DataFrame, tylko na PairRDD, ale kiedy posortujesz DataFrame, użyje go w swoim LogicalPlan, co pomoże, gdy będziesz musiał wykonać obliczenia na każdym koncie.

Właśnie natknąłem się na ten sam dokładny problem, z ramką danych, którą chcę podzielić według konta. Zakładam, że kiedy mówisz „chcesz podzielić dane na partycje, aby wszystkie transakcje dla konta znajdowały się na tej samej partycji Spark”, chcesz, aby była to skalowalność i wydajność, ale Twój kod nie zależy od tego (np. mapPartitions()itp.), prawda?

Romi Kuntsman
źródło
3
A co jeśli twój kod jest od niego zależny, ponieważ używasz mapPartitions?
NightWolf
2
Można przekonwertować DataFrame do RDD, a następnie podzielono je (na przykład za pomocą aggregatByKey () i przekazać niestandardowe partycjonowania)
Romi Kuntsman
5

Udało mi się to zrobić za pomocą RDD. Ale nie wiem, czy to jest dla ciebie akceptowalne rozwiązanie. Gdy masz już DF dostępny jako RDD, możesz złożyć wniosek repartitionAndSortWithinPartitionso dokonanie niestandardowego ponownego podziału danych.

Oto próbka, której użyłem:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
Deweloper
źródło