Jak działa HashPartitioner?

82

Przeczytałem w dokumentacji HashPartitioner. Niestety nic nie zostało wyjaśnione poza wywołaniami API. Zakładam, że HashPartitionerdzieli rozproszony zestaw na podstawie skrótu kluczy. Na przykład, jeśli moje dane są podobne

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

Więc partycjoner umieściłby to na różnych partycjach z tymi samymi kluczami należącymi do tej samej partycji. Jednak nie rozumiem znaczenia argumentu konstruktora

new HashPartitoner(numPartitions) //What does numPartitions do?

W przypadku powyższego zestawu danych, jak różniłyby się wyniki, gdybym to zrobił

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

Jak to HashPartitionerwłaściwie działa?

Sohaib
źródło

Odpowiedzi:

162

Cóż, uczyńmy Twój zbiór danych nieznacznie bardziej interesującym:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

Mamy sześć elementów:

rdd.count
Long = 6

bez partycjonera:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

i osiem partycji:

rdd.partitions.length
Int = 8

Teraz zdefiniujmy małego pomocnika, który zlicza liczbę elementów na partycję:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Ponieważ nie mamy partycjonera, nasz zestaw danych jest dystrybuowany równomiernie między partycjami ( domyślny schemat partycjonowania w Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

dystrybucja początkowa

Teraz podzielmy nasz zbiór danych na partycje:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Ponieważ parametr przekazany do HashPartitionerokreśla liczbę partycji, spodziewamy się jednej partycji:

rddOneP.partitions.length
Int = 1

Ponieważ mamy tylko jedną partycję, zawiera ona wszystkie elementy:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

hash-partitioner-1

Zauważ, że kolejność wartości po tasowaniu jest niedeterministyczna.

Tak samo, jeśli używamy HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

otrzymamy 2 partycje:

rddTwoP.partitions.length
Int = 2

Ponieważ rddjest podzielony na partycje według klucza, dane nie będą już dystrybuowane równomiernie:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Ponieważ mając trzy klucze i tylko dwie różne wartości hashCodemoda, numPartitionsnie ma tu nic nieoczekiwanego:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Aby potwierdzić powyższe:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

hash-partitioner-2

W końcu HashPartitioner(7)otrzymujemy siedem partycji, trzy niepuste z 2 elementami każda:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

hash-partitioner-7

Podsumowanie i notatki

  • HashPartitioner przyjmuje pojedynczy argument, który określa liczbę partycji
  • wartości są przypisywane do partycji za pomocą hashkluczy. hashfunkcja może się różnić w zależności od języka (może używać Scala RDD hashCode, DataSetsużyj MurmurHash 3, PySpark, portable_hash).

    W prostym przypadku, takim jak ten, gdzie klucz jest małą liczbą całkowitą, możesz założyć, że hashjest to tożsamość ( i = hash(i)).

    Scala API używa nonNegativeModdo określenia partycji na podstawie obliczonego skrótu,

  • jeśli dystrybucja kluczy nie jest jednolita, możesz skończyć w sytuacjach, gdy część klastra jest bezczynna

  • klucze muszą być hashowane. Możesz sprawdzić moją odpowiedź na listę A jako klucz do reduktora PySpark w reduktorze, aby przeczytać o specyficznych problemach z PySpark. Na inny możliwy problem zwraca uwagę dokumentacja HashPartitioner :

    Tablice Java mają hashCodes, które są oparte na tożsamości tablic, a nie na ich zawartości, więc próba podzielenia RDD [Array [ ]] lub RDD [(Array [ ], _)] przy użyciu HashPartitioner da nieoczekiwany lub niepoprawny wynik.

  • W Pythonie 3 musisz upewnić się, że haszowanie jest spójne. Zobacz Co oznacza wyjątek: Losowość skrótu ciągu powinna być wyłączona za pomocą PYTHONHASHSEED oznacza w pyspark?

  • Hash partycjoner nie jest ani iniekcyjny, ani surjektywny. Do jednej partycji można przypisać wiele kluczy, a niektóre partycje mogą pozostać puste.

  • Należy pamiętać, że obecnie metody oparte na skrótach nie działają w Scali w połączeniu z klasami przypadków zdefiniowanymi w REPL ( równość klas Case w Apache Spark ).

  • HashPartitioner(lub jakikolwiek inny Partitioner) tasuje dane. O ile partycjonowanie nie jest ponownie używane między wieloma operacjami, nie zmniejsza to ilości danych do przetasowania.

zero323
źródło
Świetnie napisać, dziękuję. Jednak zauważyłem, w obrazach, które masz (1, None)z hash(2) % Pktórym P jest partycja. Nie powinno być hash(1) % P?
javamonkey79
Używam Spark 2.2 i nie ma partitionByinterfejsu API w rdd. w dataframe.write znajduje się partitionBy, ale nie przyjmuje Partitioner jako argumentu.
hakunami
świetna odpowiedź ... shuffle opiera się na partionerze, dobry partioner może zmniejszyć ilość przetasowanych danych.
Leon
1
Świetna odpowiedź. Mam pytanie, na które nie otrzymuję solidnej odpowiedzi w internecie. Kiedy używamy df.repartition (n), tj. Gdy nie określamy żadnej kolumny jako klucza, to w jaki sposób funkcja skrótu działa wewnętrznie, ponieważ nie ma nic do zahaszowania?
dsk
@dsk, jeśli nie określisz klucza, uważam, że repartycjonowanie używa RoundRobinPartitioning. Tutaj jest dyskusja .
Mike Souder
6

RDDjest dystrybuowany, co oznacza, że ​​jest podzielony na kilka części. Każda z tych partycji znajduje się potencjalnie na innym komputerze. Hash partycjoner z argumentem numPartitionswybiera na której partycji umieścić parę (key, value)w następujący sposób:

  1. Tworzy dokładnie numPartitionspartycje.
  2. Miejsca (key, value)w partycji z numeremHash(key) % numPartitions
abalcerek
źródło
3

HashPartitioner.getPartitionMetoda bierze klucz jako argument i zwraca indeks partycji której klucz należy. Partycjoner musi wiedzieć, jakie są prawidłowe indeksy, więc zwraca liczby z właściwego zakresu. Liczba partycji jest określana za pomocą numPartitionsargumentu konstruktora.

Implementacja powraca z grubsza key.hashCode() % numPartitions. Aby uzyskać więcej informacji, zobacz Partitioner.scala .

Daniel Darabos
źródło