Jak przechowywać obiekty niestandardowe w zestawie danych?

149

Zgodnie z wprowadzeniem zestawów danych Spark :

Z niecierpliwością czekamy na Spark 2.0, planujemy kilka ekscytujących ulepszeń zestawów danych, w szczególności: ... Niestandardowe kodery - podczas gdy obecnie automatycznie generujemy kodery dla wielu różnych typów, chcielibyśmy otworzyć interfejs API dla niestandardowych obiektów.

i próbuje zapisać niestandardowy typ w Datasetprowadzić do następującego błędu, takiego jak:

Nie można znaleźć kodera dla typu przechowywanego w zestawie danych. Typy pierwotne (Int, String itp.) I Typy produktów (klasy przypadków) są obsługiwane przez importowanie sqlContext.implicits._ Obsługa serializacji innych typów zostanie dodana w przyszłych wersjach

lub:

Java.lang.UnsupportedOperationException: nie znaleziono kodera dla ....

Czy istnieją jakieś obejścia?


Zauważ, że to pytanie istnieje tylko jako punkt wejścia do odpowiedzi na Wiki społeczności. Możesz zaktualizować / ulepszyć zarówno pytanie, jak i odpowiedź.

zero 323
źródło

Odpowiedzi:

240

Aktualizacja

Ta odpowiedź jest nadal ważne i pouczające, chociaż rzeczy są teraz lepiej od 2.2 / 2.3, który dodaje wbudowane wsparcie dla enkodera Set, Seq, Map, Date, Timestamp, i BigDecimal. Jeśli trzymasz się tworzenia typów tylko z klasami przypadków i zwykłymi typami Scala, powinieneś być w porządku z tylko niejawnym w SQLImplicits.


Niestety, praktycznie nic nie zostało dodane, aby w tym pomóc. Wyszukiwanie @since 2.0.0w Encoders.scalalub SQLImplicits.scalaznajdowanie rzeczy głównie związanych z typami prymitywnymi (i pewne modyfikacje klas przypadków). A więc pierwszą rzeczą do powiedzenia: obecnie nie ma naprawdę dobrego wsparcia dla niestandardowych koderów klas . Po usunięciu tego z drogi, oto kilka sztuczek, które wykonują tak dobrą robotę, jak możemy kiedykolwiek mieć nadzieję, biorąc pod uwagę to, co obecnie mamy do dyspozycji. Z góry zastrzeżenie: to nie zadziała idealnie i dołożę wszelkich starań, aby wszystkie ograniczenia były jasne i z góry.

Na czym dokładnie polega problem

Jeśli chcesz utworzyć zestaw danych, Spark „wymaga kodera (do konwersji obiektu JVM typu T na wewnętrzną reprezentację Spark SQL i z niej), który jest generalnie tworzony automatycznie za pomocą implicits z a SparkSessionlub może być tworzony jawnie przez wywołanie metod statycznych on Encoders”(zaczerpnięte z dokumentacjicreateDataset ). Koder przyjmie postać, w Encoder[T]której Tjest typem, który kodujesz. Pierwszą sugestią jest dodanie import spark.implicits._(co daje te niejawne kodery), a drugą sugestią jest jawne przekazanie niejawnego kodera przy użyciu tego zestawu funkcji związanych z koderem.

Nie ma kodera dostępnego dla zwykłych klas, więc

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

poda następujący niejawny powiązany błąd czasu kompilacji:

Nie można znaleźć kodera dla typu przechowywanego w zestawie danych. Typy pierwotne (Int, String itp.) I Typy produktów (klasy przypadków) są obsługiwane przez importowanie sqlContext.implicits._ Obsługa serializacji innych typów zostanie dodana w przyszłych wersjach

Jeśli jednak zawiniesz dowolny typ, którego właśnie użyłeś, aby uzyskać powyższy błąd w jakiejś rozszerzonej klasie Product, błąd myląco zostanie opóźniony do czasu wykonania, więc

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Kompiluje się dobrze, ale kończy się niepowodzeniem w czasie wykonywania z

java.lang.UnsupportedOperationException: nie znaleziono kodera dla MyObj

Powodem tego jest to, że kodery, które Spark tworzy za pomocą implicitów, są w rzeczywistości tworzone tylko w czasie wykonywania (przez ponowne sprawdzenie scali). W tym przypadku wszystkie kontrole Spark w czasie kompilacji polegają na tym, że najbardziej zewnętrzna klasa rozszerza Product(co robią wszystkie klasy przypadków) i dopiero w czasie wykonywania zdaje sobie sprawę, że nadal nie wie, co zrobić MyObj(ten sam problem występuje, gdy próbuję a Dataset[(Int,MyObj)]- Spark czeka, aż runtime włączy się MyObj). Oto główne problemy, które pilnie wymagają rozwiązania:

  • niektóre klasy, które rozszerzają Productkompilację, mimo że zawsze ulegają awarii w czasie wykonywania i
  • nie ma sposobu na przekazanie niestandardowych koderów dla typów zagnieżdżonych (nie mam możliwości przekazania Spark koderowi tylko po to MyObj, aby wiedział, jak zakodować Wrap[MyObj]lub (Int,MyObj)).

Po prostu użyj kryo

Rozwiązaniem, które wszyscy sugerują, jest użycie kryokodera.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Jednak szybko staje się to nudne. Zwłaszcza, jeśli Twój kod manipuluje różnymi zbiorami danych, łączy się, grupuje itp. W rezultacie uzyskujesz kilka dodatkowych implikacji. Dlaczego więc po prostu nie założyć, że robi to wszystko automatycznie?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

A teraz wygląda na to, że mogę zrobić prawie wszystko, co chcę (poniższy przykład nie zadziała w miejscu, w spark-shellktórym spark.implicits._jest automatycznie importowany)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Albo prawie. Problem polega na tym, że użycie kryoprowadzi do Spark po prostu przechowuje każdy wiersz w zestawie danych jako płaski obiekt binarny. Na map, filter, foreachże to wystarczy, ale dla takich operacji join, Spark naprawdę potrzebuje tych mają być rozdzielone na kolumnach. Po sprawdzeniu schematu pod kątem d2lub d3widać, że jest tylko jedna kolumna binarna:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Częściowe rozwiązanie dla krotek

Tak więc, używając magii implikacji w Scali (więcej w 6.26.3 Overloading Resolution ), mogę stworzyć serię implikacji, które wykonają jak najlepszą robotę, przynajmniej w przypadku krotek, i będą dobrze działać z istniejącymi implikacjami:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Następnie, uzbrojony w te implikacje, mogę sprawić, że powyższy przykład zadziała, aczkolwiek z pewnymi zmianami nazw kolumn

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

I jeszcze nie zorientowali się, jak uzyskać oczekiwane nazwiska krotka ( _1, _2...) domyślnie bez zmiany nazwy ich - jeśli ktoś chce się bawić z tym, to jest, gdy nazwa "value"zostanie wprowadzona, a to jest, gdy krotka nazwy są zwykle dodawane. Jednak kluczową kwestią jest to, że mam teraz ładny strukturalny schemat:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Podsumowując, to obejście:

  • pozwala nam uzyskać oddzielne kolumny dla krotek (abyśmy mogli ponownie dołączyć do krotek, yay!)
  • możemy znowu polegać na implikacjach (więc nie ma potrzeby przechodzenia w kryokażdym miejscu)
  • jest prawie całkowicie wstecznie kompatybilny z import spark.implicits._( wymaga zmiany nazwy)
  • nie nie pozwól nam przyłączyć się na kyroserializacji kolumnach binarnych, nie mówiąc już o tych pól może mieć
  • ma nieprzyjemny efekt uboczny zmiany nazwy niektórych kolumn krotki na „wartość” (w razie potrzeby można to cofnąć, konwertując .toDF, określając nowe nazwy kolumn i konwertując z powrotem do zestawu danych - a nazwy schematów wydają się być zachowane dzięki złączeniom , gdzie są najbardziej potrzebne).

Częściowe rozwiązanie dla zajęć w ogóle

Ten jest mniej przyjemny i nie ma dobrego rozwiązania. Jednak teraz, gdy mamy powyższe rozwiązanie krotki, mam przeczucie, że rozwiązanie niejawnej konwersji z innej odpowiedzi będzie również nieco mniej bolesne, ponieważ możesz przekonwertować bardziej złożone klasy na krotki. Następnie, po utworzeniu zestawu danych, prawdopodobnie zmieniłbyś nazwy kolumn, używając podejścia Dataframe. Jeśli wszystko pójdzie dobrze, jest to naprawdę poprawa, ponieważ mogę teraz wykonywać połączenia na polach moich zajęć. Gdybym użył tylko jednego płaskiego kryoserializatora binarnego , nie byłoby to możliwe.

Oto przykład, który ma wszystkiego po trochu: Mam klasy MyObj, która ma pola typów Int, java.util.UUIDoraz Set[String]. Pierwsza dba o siebie. Drugi, chociaż mógłbym serializować za pomocą, kryobyłby bardziej przydatny, gdyby był przechowywany jako a String(ponieważ UUIDs są zwykle czymś, przeciwko czemu chcę się przyłączyć). Trzeci tak naprawdę należy do kolumny binarnej.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Teraz mogę utworzyć zbiór danych z ładnym schematem przy użyciu tej maszyny:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Schemat pokazuje mi kolumny I z właściwymi nazwami i dwoma pierwszymi rzeczami, które mogę połączyć.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
Alec
źródło
Czy można utworzyć klasę niestandardową ExpressionEncoderprzy użyciu serializacji JSON? W moim przypadku krotki nie ujdą na sucho, a kryo daje mi kolumnę binarną ..
Alexey Svyatkovskiy
1
@AlexeyS Nie sądzę. Ale dlaczego chcesz tego? Dlaczego nie ujdzie ci na sucho ostatnie rozwiązanie, które proponuję? Jeśli możesz umieścić swoje dane w JSON, powinieneś być w stanie wyodrębnić pola i umieścić je w klasie przypadku ...
Alec
1
Niestety sednem tej odpowiedzi jest to, że nie ma rozwiązania, które działa.
baol
@baol Rodzaj. Ale pamiętaj, jak trudne jest to, co robi Spark. System typów Scali nie jest po prostu wystarczająco wydajny, aby „wyprowadzić” kodery, które rekurencyjnie przechodzą przez pola. Szczerze mówiąc, jestem po prostu zaskoczony, że nikt nie stworzył do tego makra adnotacji. Wydaje się naturalne (ale trudne) rozwiązanie.
Alec
1
@combinatorist Rozumiem, że zestawy danych i ramki danych (ale nie RDD, ponieważ nie potrzebują koderów!) są równoważne z punktu widzenia wydajności. Nie lekceważ bezpieczeństwa typów zbiorów danych! Tylko dlatego, że Spark wewnętrznie wykorzystuje mnóstwo odbić, rzutów itp., Nie oznacza to, że nie powinieneś przejmować się bezpieczeństwem typu interfejsu, który jest narażony. Ale to sprawia, że ​​czuję się lepiej, tworząc własne bezpieczne funkcje oparte na zestawie danych, które używają Dataframes pod maską.
Alec
32
  1. Korzystanie z koderów ogólnych.

    Na razie są dostępne dwa ogólne kodery, kryoa javaSerializationten drugi jest wyraźnie opisany jako:

    wyjątkowo nieefektywne i powinno być używane tylko w ostateczności.

    Zakładając następującą klasę

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }

    możesz użyć tych koderów, dodając niejawny koder:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }

    które mogą być używane razem w następujący sposób:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }

    Przechowuje obiekty jako binarykolumny, więc po przekonwertowaniu do DataFrameciebie otrzymasz następujący schemat:

    root
     |-- value: binary (nullable = true)

    Możliwe jest również kodowanie krotek za pomocą kryoenkodera dla określonego pola:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]

    Należy pamiętać, że nie polegamy tutaj na niejawnych koderach, ale jawnie przekazujemy koder, więc najprawdopodobniej nie zadziała z toDSmetodą.

  2. Korzystanie z niejawnych konwersji:

    Zapewnij niejawne konwersje między reprezentacją, którą można zakodować, a klasą niestandardową, na przykład:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Powiązane pytania:

zero 332
źródło
Wydaje się, że rozwiązanie 1 nie działa w przypadku (przynajmniej Set) kolekcji wpisanych na maszynie Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar].
Victor P.,
@VictorP. Oczekuje się, że boję się W takim przypadku będziesz potrzebować kodera dla określonego typu ( kryo[Set[Bar]]. Tak samo, jeśli klasa zawiera pole Bar, potrzebujesz kodera dla całego obiektu. Są to bardzo prymitywne metody.
zero323
@ zero323 Mam ten sam problem. Czy możesz umieścić przykład kodu, jak zakodować cały projekt? Wielkie dzięki!
Rock
@Rock Nie jestem pewien, co masz na myśli przez „cały projekt”
zero323
@ zero323 za Twój komentarz, „jeśli klasa zawiera pole Bar, potrzebujesz kodera dla całego obiektu”. moje pytanie brzmiało jak zakodować ten „cały projekt”?
Rocka
9

Możesz użyć rejestracji UDTR, a następnie klas przypadku, krotek itp. ... wszystkie działają poprawnie z typem zdefiniowanym przez użytkownika!

Załóżmy, że chcesz użyć niestandardowego wyliczenia:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Zarejestruj to w ten sposób:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Następnie UŻYJ!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Załóżmy, że chcesz użyć rekordu polimorficznego:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... i używaj go w ten sposób:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Możesz napisać niestandardowy UDT, który koduje wszystko do bajtów (używam tutaj serializacji java, ale prawdopodobnie lepiej jest instrumentować kontekst Kryo Sparka).

Najpierw zdefiniuj klasę UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Następnie zarejestruj to:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Wtedy możesz go użyć!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
ChoppyTheLumberjack
źródło
1
Nie widzę, gdzie używane jest twoje kryo (w CustomPolyUDT)
mathieu
Próbuję zdefiniować UDT w moim projekcie i otrzymuję komunikat o błędzie „Symbol UserDefinedType jest niedostępny z tego miejsca”. Jakaś pomoc ?
Rijo Joseph,
Cześć @RijoJoseph. Musisz stworzyć pakiet org.apache.spark w swoim projekcie i umieścić w nim swój kod UDT.
ChoppyTheLumberjack
6

Enkodery działają mniej więcej tak samo w Spark2.0. I Kryonadal jest zalecanym serializationwyborem.

Możesz spojrzeć na następujący przykład z powłoką iskrową

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Do tej pory nie było ich appropriate encodersw obecnym zakresie, więc nasze osoby nie były kodowane jako binarywartości. Ale to się zmieni, gdy udostępnimy niektóre implicitkodery korzystające z Kryoserializacji.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
sarveshseri
źródło
3

W przypadku klasy Java Bean może to być przydatne

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Teraz możesz po prostu odczytać ramkę dataFrame jako niestandardową ramkę danych

dataFrame.as[MyClass]

Spowoduje to utworzenie niestandardowego kodera klasy, a nie binarnego.

Akash Mahajan
źródło
1

Moje przykłady będą w Javie, ale nie wyobrażam sobie, że adaptacja do Scali będzie trudna.

Byłem całkiem udany konwersji RDD<Fruit>do Dataset<Fruit>korzystania spark.createDataset i Encoders.bean tak długo, jak Fruitjest to prosty Java Bean .

Krok 1: Utwórz prosty Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Trzymałbym się klas z typami prymitywnymi i polami typu String jako, zanim ludzie z DataBricks wzmocnią swoje kodery. Jeśli masz klasę z zagnieżdżonym obiektem, utwórz kolejny prosty Java Bean ze spłaszczonymi wszystkimi jego polami, dzięki czemu możesz użyć transformacji RDD do odwzorowania typu złożonego na prostszy. Jasne, to trochę więcej pracy, ale wyobrażam sobie, że bardzo pomoże to w wydajności pracy z płaskim schematem.

Krok 2: Pobierz swój zestaw danych z RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

I voila! Pianka, spłucz, powtórz.

Jimmy Da
źródło
Proponuję wskazać, że w przypadku prostych struktur lepiej byłoby przechowywać je w natywnych typach Spark, zamiast serializować je do obiektu blob. Działają lepiej na bramie Pythona, są bardziej przezroczyste w Parquet, a nawet mogą być rzutowane na struktury o tym samym kształcie.
metasim
1

Dla tych, którzy mogą w mojej sytuacji, również tutaj umieszczam swoją odpowiedź.

Mówiąc konkretnie,

  1. Czytałem „Ustaw wpisane dane” z SQLContext. Tak więc oryginalny format danych to DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Następnie przekonwertuj go do RDD za pomocą rdd.map () z typem mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Wynik:

    (1,Set(1))

Taeheon Kwon
źródło
0

Oprócz podanych już sugestii, inną opcją, którą niedawno odkryłem, jest to, że możesz zadeklarować własną klasę, w tym cechę org.apache.spark.sql.catalyst.DefinedByConstructorParams.

Działa to, jeśli klasa ma konstruktor, który używa typów, które ExpressionEncoder może zrozumieć, tj. Wartości pierwotne i standardowe kolekcje. Może się przydać, gdy nie możesz zadeklarować klasy jako klasy przypadku, ale nie chcesz używać Kryo do kodowania jej za każdym razem, gdy jest ona zawarta w zestawie danych.

Na przykład chciałem zadeklarować klasę przypadku zawierającą wektor Breeze. Jedynym koderem, który byłby w stanie sobie z tym poradzić, byłby Kryo. Ale gdybym zadeklarował podklasę, która rozszerzyła Breeze DenseVector i DefinedByConstructorParams, ExpressionEncoder zrozumiał, że można ją serializować jako tablicę Doubles.

Oto jak to zadeklarowałem:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Teraz mogę używać SerializableDenseVectorw zestawie danych (bezpośrednio lub jako część produktu) przy użyciu prostego ExpressionEncoder i bez Kryo. Działa podobnie jak Breeze DenseVector, ale serializuje się jako Array [Double].

Matt
źródło