Aktualizacja
Ta odpowiedź jest nadal ważne i pouczające, chociaż rzeczy są teraz lepiej od 2.2 / 2.3, który dodaje wbudowane wsparcie dla enkodera Set
, Seq
, Map
, Date
, Timestamp
, i BigDecimal
. Jeśli trzymasz się tworzenia typów tylko z klasami przypadków i zwykłymi typami Scala, powinieneś być w porządku z tylko niejawnym w SQLImplicits
.
Niestety, praktycznie nic nie zostało dodane, aby w tym pomóc. Wyszukiwanie @since 2.0.0
w Encoders.scala
lub SQLImplicits.scala
znajdowanie rzeczy głównie związanych z typami prymitywnymi (i pewne modyfikacje klas przypadków). A więc pierwszą rzeczą do powiedzenia: obecnie nie ma naprawdę dobrego wsparcia dla niestandardowych koderów klas . Po usunięciu tego z drogi, oto kilka sztuczek, które wykonują tak dobrą robotę, jak możemy kiedykolwiek mieć nadzieję, biorąc pod uwagę to, co obecnie mamy do dyspozycji. Z góry zastrzeżenie: to nie zadziała idealnie i dołożę wszelkich starań, aby wszystkie ograniczenia były jasne i z góry.
Na czym dokładnie polega problem
Jeśli chcesz utworzyć zestaw danych, Spark „wymaga kodera (do konwersji obiektu JVM typu T na wewnętrzną reprezentację Spark SQL i z niej), który jest generalnie tworzony automatycznie za pomocą implicits z a SparkSession
lub może być tworzony jawnie przez wywołanie metod statycznych on Encoders
”(zaczerpnięte z dokumentacjicreateDataset
). Koder przyjmie postać, w Encoder[T]
której T
jest typem, który kodujesz. Pierwszą sugestią jest dodanie import spark.implicits._
(co daje te niejawne kodery), a drugą sugestią jest jawne przekazanie niejawnego kodera przy użyciu tego zestawu funkcji związanych z koderem.
Nie ma kodera dostępnego dla zwykłych klas, więc
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
poda następujący niejawny powiązany błąd czasu kompilacji:
Nie można znaleźć kodera dla typu przechowywanego w zestawie danych. Typy pierwotne (Int, String itp.) I Typy produktów (klasy przypadków) są obsługiwane przez importowanie sqlContext.implicits._ Obsługa serializacji innych typów zostanie dodana w przyszłych wersjach
Jeśli jednak zawiniesz dowolny typ, którego właśnie użyłeś, aby uzyskać powyższy błąd w jakiejś rozszerzonej klasie Product
, błąd myląco zostanie opóźniony do czasu wykonania, więc
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Kompiluje się dobrze, ale kończy się niepowodzeniem w czasie wykonywania z
java.lang.UnsupportedOperationException: nie znaleziono kodera dla MyObj
Powodem tego jest to, że kodery, które Spark tworzy za pomocą implicitów, są w rzeczywistości tworzone tylko w czasie wykonywania (przez ponowne sprawdzenie scali). W tym przypadku wszystkie kontrole Spark w czasie kompilacji polegają na tym, że najbardziej zewnętrzna klasa rozszerza Product
(co robią wszystkie klasy przypadków) i dopiero w czasie wykonywania zdaje sobie sprawę, że nadal nie wie, co zrobić MyObj
(ten sam problem występuje, gdy próbuję a Dataset[(Int,MyObj)]
- Spark czeka, aż runtime włączy się MyObj
). Oto główne problemy, które pilnie wymagają rozwiązania:
- niektóre klasy, które rozszerzają
Product
kompilację, mimo że zawsze ulegają awarii w czasie wykonywania i
- nie ma sposobu na przekazanie niestandardowych koderów dla typów zagnieżdżonych (nie mam możliwości przekazania Spark koderowi tylko po to
MyObj
, aby wiedział, jak zakodować Wrap[MyObj]
lub (Int,MyObj)
).
Po prostu użyj kryo
Rozwiązaniem, które wszyscy sugerują, jest użycie kryo
kodera.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Jednak szybko staje się to nudne. Zwłaszcza, jeśli Twój kod manipuluje różnymi zbiorami danych, łączy się, grupuje itp. W rezultacie uzyskujesz kilka dodatkowych implikacji. Dlaczego więc po prostu nie założyć, że robi to wszystko automatycznie?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
A teraz wygląda na to, że mogę zrobić prawie wszystko, co chcę (poniższy przykład nie zadziała w miejscu, w spark-shell
którym spark.implicits._
jest automatycznie importowany)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Albo prawie. Problem polega na tym, że użycie kryo
prowadzi do Spark po prostu przechowuje każdy wiersz w zestawie danych jako płaski obiekt binarny. Na map
, filter
, foreach
że to wystarczy, ale dla takich operacji join
, Spark naprawdę potrzebuje tych mają być rozdzielone na kolumnach. Po sprawdzeniu schematu pod kątem d2
lub d3
widać, że jest tylko jedna kolumna binarna:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Częściowe rozwiązanie dla krotek
Tak więc, używając magii implikacji w Scali (więcej w 6.26.3 Overloading Resolution ), mogę stworzyć serię implikacji, które wykonają jak najlepszą robotę, przynajmniej w przypadku krotek, i będą dobrze działać z istniejącymi implikacjami:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Następnie, uzbrojony w te implikacje, mogę sprawić, że powyższy przykład zadziała, aczkolwiek z pewnymi zmianami nazw kolumn
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
I jeszcze nie zorientowali się, jak uzyskać oczekiwane nazwiska krotka ( _1
, _2
...) domyślnie bez zmiany nazwy ich - jeśli ktoś chce się bawić z tym, to jest, gdy nazwa "value"
zostanie wprowadzona, a to jest, gdy krotka nazwy są zwykle dodawane. Jednak kluczową kwestią jest to, że mam teraz ładny strukturalny schemat:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Podsumowując, to obejście:
- pozwala nam uzyskać oddzielne kolumny dla krotek (abyśmy mogli ponownie dołączyć do krotek, yay!)
- możemy znowu polegać na implikacjach (więc nie ma potrzeby przechodzenia w
kryo
każdym miejscu)
- jest prawie całkowicie wstecznie kompatybilny z
import spark.implicits._
( wymaga zmiany nazwy)
- nie nie pozwól nam przyłączyć się na
kyro
serializacji kolumnach binarnych, nie mówiąc już o tych pól może mieć
- ma nieprzyjemny efekt uboczny zmiany nazwy niektórych kolumn krotki na „wartość” (w razie potrzeby można to cofnąć, konwertując
.toDF
, określając nowe nazwy kolumn i konwertując z powrotem do zestawu danych - a nazwy schematów wydają się być zachowane dzięki złączeniom , gdzie są najbardziej potrzebne).
Częściowe rozwiązanie dla zajęć w ogóle
Ten jest mniej przyjemny i nie ma dobrego rozwiązania. Jednak teraz, gdy mamy powyższe rozwiązanie krotki, mam przeczucie, że rozwiązanie niejawnej konwersji z innej odpowiedzi będzie również nieco mniej bolesne, ponieważ możesz przekonwertować bardziej złożone klasy na krotki. Następnie, po utworzeniu zestawu danych, prawdopodobnie zmieniłbyś nazwy kolumn, używając podejścia Dataframe. Jeśli wszystko pójdzie dobrze, jest to naprawdę poprawa, ponieważ mogę teraz wykonywać połączenia na polach moich zajęć. Gdybym użył tylko jednego płaskiego kryo
serializatora binarnego , nie byłoby to możliwe.
Oto przykład, który ma wszystkiego po trochu: Mam klasy MyObj
, która ma pola typów Int
, java.util.UUID
oraz Set[String]
. Pierwsza dba o siebie. Drugi, chociaż mógłbym serializować za pomocą, kryo
byłby bardziej przydatny, gdyby był przechowywany jako a String
(ponieważ UUID
s są zwykle czymś, przeciwko czemu chcę się przyłączyć). Trzeci tak naprawdę należy do kolumny binarnej.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Teraz mogę utworzyć zbiór danych z ładnym schematem przy użyciu tej maszyny:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Schemat pokazuje mi kolumny I z właściwymi nazwami i dwoma pierwszymi rzeczami, które mogę połączyć.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
przy użyciu serializacji JSON? W moim przypadku krotki nie ujdą na sucho, a kryo daje mi kolumnę binarną ..Korzystanie z koderów ogólnych.
Na razie są dostępne dwa ogólne kodery,
kryo
ajavaSerialization
ten drugi jest wyraźnie opisany jako:Zakładając następującą klasę
możesz użyć tych koderów, dodając niejawny koder:
które mogą być używane razem w następujący sposób:
Przechowuje obiekty jako
binary
kolumny, więc po przekonwertowaniu doDataFrame
ciebie otrzymasz następujący schemat:Możliwe jest również kodowanie krotek za pomocą
kryo
enkodera dla określonego pola:Należy pamiętać, że nie polegamy tutaj na niejawnych koderach, ale jawnie przekazujemy koder, więc najprawdopodobniej nie zadziała z
toDS
metodą.Korzystanie z niejawnych konwersji:
Zapewnij niejawne konwersje między reprezentacją, którą można zakodować, a klasą niestandardową, na przykład:
Powiązane pytania:
źródło
Set
) kolekcji wpisanych na maszynieException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
. Tak samo, jeśli klasa zawiera poleBar
, potrzebujesz kodera dla całego obiektu. Są to bardzo prymitywne metody.Bar
, potrzebujesz kodera dla całego obiektu”. moje pytanie brzmiało jak zakodować ten „cały projekt”?Możesz użyć rejestracji UDTR, a następnie klas przypadku, krotek itp. ... wszystkie działają poprawnie z typem zdefiniowanym przez użytkownika!
Załóżmy, że chcesz użyć niestandardowego wyliczenia:
Zarejestruj to w ten sposób:
Następnie UŻYJ!
Załóżmy, że chcesz użyć rekordu polimorficznego:
... i używaj go w ten sposób:
Możesz napisać niestandardowy UDT, który koduje wszystko do bajtów (używam tutaj serializacji java, ale prawdopodobnie lepiej jest instrumentować kontekst Kryo Sparka).
Najpierw zdefiniuj klasę UDT:
Następnie zarejestruj to:
Wtedy możesz go użyć!
źródło
Enkodery działają mniej więcej tak samo w
Spark2.0
. IKryo
nadal jest zalecanymserialization
wyborem.Możesz spojrzeć na następujący przykład z powłoką iskrową
Do tej pory nie było ich
appropriate encoders
w obecnym zakresie, więc nasze osoby nie były kodowane jakobinary
wartości. Ale to się zmieni, gdy udostępnimy niektóreimplicit
kodery korzystające zKryo
serializacji.źródło
W przypadku klasy Java Bean może to być przydatne
Teraz możesz po prostu odczytać ramkę dataFrame jako niestandardową ramkę danych
Spowoduje to utworzenie niestandardowego kodera klasy, a nie binarnego.
źródło
Moje przykłady będą w Javie, ale nie wyobrażam sobie, że adaptacja do Scali będzie trudna.
Byłem całkiem udany konwersji
RDD<Fruit>
doDataset<Fruit>
korzystania spark.createDataset i Encoders.bean tak długo, jakFruit
jest to prosty Java Bean .Krok 1: Utwórz prosty Java Bean.
Trzymałbym się klas z typami prymitywnymi i polami typu String jako, zanim ludzie z DataBricks wzmocnią swoje kodery. Jeśli masz klasę z zagnieżdżonym obiektem, utwórz kolejny prosty Java Bean ze spłaszczonymi wszystkimi jego polami, dzięki czemu możesz użyć transformacji RDD do odwzorowania typu złożonego na prostszy. Jasne, to trochę więcej pracy, ale wyobrażam sobie, że bardzo pomoże to w wydajności pracy z płaskim schematem.
Krok 2: Pobierz swój zestaw danych z RDD
I voila! Pianka, spłucz, powtórz.
źródło
Dla tych, którzy mogą w mojej sytuacji, również tutaj umieszczam swoją odpowiedź.
Mówiąc konkretnie,
Czytałem „Ustaw wpisane dane” z SQLContext. Tak więc oryginalny format danych to DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Następnie przekonwertuj go do RDD za pomocą rdd.map () z typem mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Wynik:
(1,Set(1))
źródło
Oprócz podanych już sugestii, inną opcją, którą niedawno odkryłem, jest to, że możesz zadeklarować własną klasę, w tym cechę
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Działa to, jeśli klasa ma konstruktor, który używa typów, które ExpressionEncoder może zrozumieć, tj. Wartości pierwotne i standardowe kolekcje. Może się przydać, gdy nie możesz zadeklarować klasy jako klasy przypadku, ale nie chcesz używać Kryo do kodowania jej za każdym razem, gdy jest ona zawarta w zestawie danych.
Na przykład chciałem zadeklarować klasę przypadku zawierającą wektor Breeze. Jedynym koderem, który byłby w stanie sobie z tym poradzić, byłby Kryo. Ale gdybym zadeklarował podklasę, która rozszerzyła Breeze DenseVector i DefinedByConstructorParams, ExpressionEncoder zrozumiał, że można ją serializować jako tablicę Doubles.
Oto jak to zadeklarowałem:
Teraz mogę używać
SerializableDenseVector
w zestawie danych (bezpośrednio lub jako część produktu) przy użyciu prostego ExpressionEncoder i bez Kryo. Działa podobnie jak Breeze DenseVector, ale serializuje się jako Array [Double].źródło