Cóż, uczyńmy Twój zbiór danych nieznacznie bardziej interesującym:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
Mamy sześć elementów:
rdd.count
Long = 6
bez partycjonera:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
i osiem partycji:
rdd.partitions.length
Int = 8
Teraz zdefiniujmy małego pomocnika, który zlicza liczbę elementów na partycję:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Ponieważ nie mamy partycjonera, nasz zestaw danych jest dystrybuowany równomiernie między partycjami ( domyślny schemat partycjonowania w Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Teraz podzielmy nasz zbiór danych na partycje:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Ponieważ parametr przekazany do HashPartitioner
określa liczbę partycji, spodziewamy się jednej partycji:
rddOneP.partitions.length
Int = 1
Ponieważ mamy tylko jedną partycję, zawiera ona wszystkie elementy:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Zauważ, że kolejność wartości po tasowaniu jest niedeterministyczna.
Tak samo, jeśli używamy HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
otrzymamy 2 partycje:
rddTwoP.partitions.length
Int = 2
Ponieważ rdd
jest podzielony na partycje według klucza, dane nie będą już dystrybuowane równomiernie:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Ponieważ mając trzy klucze i tylko dwie różne wartości hashCode
moda, numPartitions
nie ma tu nic nieoczekiwanego:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Aby potwierdzić powyższe:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
W końcu HashPartitioner(7)
otrzymujemy siedem partycji, trzy niepuste z 2 elementami każda:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Podsumowanie i notatki
HashPartitioner
przyjmuje pojedynczy argument, który określa liczbę partycji
wartości są przypisywane do partycji za pomocą hash
kluczy. hash
funkcja może się różnić w zależności od języka (może używać Scala RDD hashCode
, DataSets
użyj MurmurHash 3, PySpark, portable_hash
).
W prostym przypadku, takim jak ten, gdzie klucz jest małą liczbą całkowitą, możesz założyć, że hash
jest to tożsamość ( i = hash(i)
).
Scala API używa nonNegativeMod
do określenia partycji na podstawie obliczonego skrótu,
jeśli dystrybucja kluczy nie jest jednolita, możesz skończyć w sytuacjach, gdy część klastra jest bezczynna
klucze muszą być hashowane. Możesz sprawdzić moją odpowiedź na listę A jako klucz do reduktora PySpark w reduktorze, aby przeczytać o specyficznych problemach z PySpark. Na inny możliwy problem zwraca uwagę dokumentacja HashPartitioner :
Tablice Java mają hashCodes, które są oparte na tożsamości tablic, a nie na ich zawartości, więc próba podzielenia RDD [Array [ ]] lub RDD [(Array [ ], _)] przy użyciu HashPartitioner da nieoczekiwany lub niepoprawny wynik.
W Pythonie 3 musisz upewnić się, że haszowanie jest spójne. Zobacz Co oznacza wyjątek: Losowość skrótu ciągu powinna być wyłączona za pomocą PYTHONHASHSEED oznacza w pyspark?
Hash partycjoner nie jest ani iniekcyjny, ani surjektywny. Do jednej partycji można przypisać wiele kluczy, a niektóre partycje mogą pozostać puste.
Należy pamiętać, że obecnie metody oparte na skrótach nie działają w Scali w połączeniu z klasami przypadków zdefiniowanymi w REPL ( równość klas Case w Apache Spark ).
HashPartitioner
(lub jakikolwiek inny Partitioner
) tasuje dane. O ile partycjonowanie nie jest ponownie używane między wieloma operacjami, nie zmniejsza to ilości danych do przetasowania.
(1, None)
zhash(2) % P
którym P jest partycja. Nie powinno byćhash(1) % P
?partitionBy
interfejsu API w rdd. w dataframe.write znajduje się partitionBy, ale nie przyjmuje Partitioner jako argumentu.RDD
jest dystrybuowany, co oznacza, że jest podzielony na kilka części. Każda z tych partycji znajduje się potencjalnie na innym komputerze. Hash partycjoner z argumentemnumPartitions
wybiera na której partycji umieścić parę(key, value)
w następujący sposób:numPartitions
partycje.(key, value)
w partycji z numeremHash(key) % numPartitions
źródło
HashPartitioner.getPartition
Metoda bierze klucz jako argument i zwraca indeks partycji której klucz należy. Partycjoner musi wiedzieć, jakie są prawidłowe indeksy, więc zwraca liczby z właściwego zakresu. Liczba partycji jest określana za pomocąnumPartitions
argumentu konstruktora.Implementacja powraca z grubsza
key.hashCode() % numPartitions
. Aby uzyskać więcej informacji, zobacz Partitioner.scala .źródło