Nie można serializować zadania: java.io.NotSerializableException podczas wywoływania funkcji poza zamknięciem tylko dla klas, a nie obiektów

224

Dziwne zachowanie podczas wywoływania funkcji poza zamknięciem:

  • kiedy funkcja jest w obiekcie, wszystko działa
  • gdy funkcja jest w klasie, uzyskaj:

Nie można serializować zadania: java.io.NotSerializableException: testowanie

Problem polega na tym, że potrzebuję mojego kodu w klasie, a nie w obiekcie. Jakiś pomysł dlaczego tak się dzieje? Czy obiekt Scala jest serializowany (domyślnie?)?

To jest przykład działającego kodu:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

To jest niedziałający przykład:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
źródło
Co to jest Spark.ctx? Nie ma obiektu Spark z metodą ctx AFAICT
javadba

Odpowiedzi:

334

RDD rozszerzają interfejs Serializowalny , więc nie jest to przyczyną niepowodzenia Twojego zadania. Teraz nie oznacza to, że możesz serializować RDDz Spark i unikaćNotSerializableException

Spark to silnik przetwarzania rozproszonego, a jego główną abstrakcją jest odporny rozproszony zestaw danych ( RDD) ), który można postrzegać jako zbiór rozproszony. Zasadniczo elementy RDD są podzielone na węzły klastra, ale Spark wyodrębnia to od użytkownika, pozwalając użytkownikowi na interakcję z RDD (kolekcją) tak, jakby była lokalna.

Nie dostać się do zbyt wielu szczegółów, ale po uruchomieniu różne transformacje na RDD ( map, flatMap, filteri inne), kod transformacja (zamknięcie) wynosi:

  1. zserializowany w węźle sterownika,
  2. wysłane do odpowiednich węzłów w klastrze,
  3. deserializowane,
  4. i ostatecznie wykonane w węzłach

Możesz oczywiście uruchomić to lokalnie (jak w twoim przykładzie), ale wszystkie te fazy (oprócz wysyłki przez sieć) nadal występują. [Pozwala to na wykrycie błędów nawet przed wdrożeniem do produkcji]

W drugim przypadku zdarza się, że wywołujesz metodę zdefiniowaną w klasie testingz wnętrza funkcji mapy. Spark widzi to i ponieważ metody nie mogą być serializowane same, Spark próbuje serializować całą testing klasę, aby kod nadal działał, gdy zostanie wykonany w innej maszynie JVM. Masz dwie możliwości:

Albo sprawisz, że testy klasowe będą serializowane, aby cała klasa mogła być serializowana przez Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

lub utworzysz someFuncfunkcję zamiast metody (funkcje są obiektami w Scali), dzięki czemu Spark będzie mógł ją serializować:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Podobny, ale nie ten sam problem z serializacją klas może Cię zainteresować i możesz go przeczytać w tej prezentacji Spark Summit 2013 .

Na marginesie, możesz przepisać rddList.map(someFunc(_))na rddList.map(someFunc), są one dokładnie takie same. Zazwyczaj drugi jest preferowany, ponieważ jest mniej gadatliwy i czytelniejszy.

EDYCJA (2015-03-15): SPARK-5307 wprowadził SerializationDebugger, a Spark 1.3.0 jest pierwszą wersją, która go używa. Dodaje ścieżkę serializacji do NotSerializableException . Po napotkaniu NotSerializableException debugger odwiedza wykres obiektu, aby znaleźć ścieżkę do obiektu, którego nie można zserializować, i konstruuje informacje, aby pomóc użytkownikowi znaleźć obiekt.

W przypadku OP to jest drukowane na standardowe wyjście:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
źródło
1
Hmm, to, co wyjaśniłeś, z pewnością ma sens i wyjaśnia, dlaczego cała klasa jest serializowana (coś, czego nie do końca zrozumiałem). Niemniej jednak nadal utrzymuję, że RDD nie są serializowane (dobrze rozszerzają Serializable, ale to nie znaczy, że nie powodują wyjątku NotSerializableException, spróbuj). Dlatego jeśli umieścisz je poza klasami, naprawi to błąd. Będę trochę edytować moją odpowiedź, aby być bardziej precyzyjnym na temat tego, co mam na myśli - tj. Powodują wyjątek, a nie to, że rozszerzają interfejs.
samthebest
35
Jeśli nie masz kontroli nad klasą, musisz mieć możliwość serializacji ... jeśli używasz Scali, możesz po prostu utworzyć ją za pomocą Serializable:val test = new Test with Serializable
Mark S
4
„rddList.map (someFunc (_)) do rddList.map (someFunc), są dokładnie takie same” Nie, nie są dokładnie takie same, a w rzeczywistości użycie tego drugiego może powodować wyjątki serializacji, których wcześniej nie zrobiłby.
samthebest
1
@samthebest czy mógłbyś wyjaśnić, dlaczego map (someFunc (_)) nie spowodowałoby wyjątków serializacji, podczas gdy map (someFunc) miałby?
Alon,
31

Odpowiedź Gregi doskonale wyjaśnia, dlaczego oryginalny kod nie działa, i dwa sposoby rozwiązania problemu. Jednak to rozwiązanie nie jest zbyt elastyczne; rozważmy przypadek, w którym zamknięcie obejmuje wywołanie metody dla Serializableklasy innej niż klasa, nad którą nie masz kontroli. Nie można dodać Serializableznacznika do tej klasy ani zmienić implementacji bazowej, aby zmienić metodę na funkcję.

Nilesh przedstawia świetne obejście tego problemu, ale rozwiązanie można uczynić bardziej zwięzłym i ogólnym:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Tego serializatora funkcji można następnie użyć do automatycznego zawijania zamknięć i wywołań metod:

rdd map genMapper(someFunc)

Ta technika ma również tę zaletę, że nie wymaga dodatkowych zależności Shark w celu uzyskania dostępu KryoSerializationWrapper, ponieważ chłód Twittera jest już przyciągany przez rdzeń Spark

Ben Sidhom
źródło
Cześć, zastanawiam się, czy muszę coś zarejestrować, jeśli użyję waszego kodu? Próbowałem i otrzymałem wyjątek „Nie można znaleźć klasy” od kryo. THX
G_cy
25

Pełna rozmowa w pełni wyjaśniająca problem, który proponuje świetny sposób zmiany paradygmatu, aby uniknąć problemów z serializacją: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

Najczęściej głosowana odpowiedź w zasadzie sugeruje odrzucenie całej funkcji językowej - która nie korzysta już z metod, a jedynie z funkcji. Rzeczywiście, należy unikać metod programowania funkcjonalnego w klasach, ale zamiana ich w funkcje nie rozwiązuje tutaj problemu projektowego (patrz powyższy link).

Jako szybką poprawkę w tej konkretnej sytuacji możesz po prostu użyć @transientadnotacji, aby powiedzieć, aby nie próbowała serializować wartości obrażającej (tutaj Spark.ctxjest klasa niestandardowa, a nie klasa Spark, która nazywa się po OP):

@transient
val rddList = Spark.ctx.parallelize(list)

Możesz także zrestrukturyzować kod, aby rddList mieszkał gdzie indziej, ale jest to również paskudne.

Przyszłość to prawdopodobnie zarodniki

W przyszłości Scala będzie zawierać te rzeczy zwane „zarodnikami”, które powinny pozwolić nam precyzyjnie kontrolować ziarno, co robi i nie do końca zostaje wciągnięte przez zamknięcie. Ponadto powinno to przekształcić wszystkie błędy przypadkowego wciągnięcia typów nie podlegających serializacji (lub dowolne niepożądane wartości) w błędy kompilacji, a nie teraz, co jest strasznymi wyjątkami / przeciekami pamięci.

http://docs.scala-lang.org/sips/pending/spores.html

Wskazówka na temat serializacji Kryo

Korzystając z Kyro, zrób to tak, aby rejestracja była konieczna, co oznacza, że ​​zamiast błędów wycieku pojawi się błąd:

„W końcu wiem, że kryo ma kryo.setRegistrationOptional (prawda), ale mam bardzo trudny czas, aby dowiedzieć się, jak go używać. Gdy ta opcja jest włączona, wydaje się, że kryo generuje wyjątki, jeśli nie zarejestrowałem się klasy ”.

Strategia rejestracji zajęć w kryo

Oczywiście daje to tylko kontrolę na poziomie typu, a nie kontrolę na poziomie wartości.

... więcej pomysłów.

samthebest
źródło
9

Rozwiązałem ten problem, stosując inne podejście. Musisz po prostu serializować obiekty przed przejściem przez zamknięcie, a następnie usuń serializację z pamięci. To podejście po prostu działa, nawet jeśli twoich klas nie można szeregować, ponieważ używa Kryo za kulisami. Wszystko czego potrzebujesz to trochę curry. ;)

Oto przykład tego, jak to zrobiłem:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Spraw, aby Blah był tak skomplikowany, jak chcesz, klasa, obiekt towarzyszący, klasy zagnieżdżone, odniesienia do wielu bibliotek stron trzecich.

KryoSerializationWrapper odnosi się do: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
źródło
Czy to faktycznie serializuje instancję, czy tworzy instancję statyczną i serializuje referencję (patrz moja odpowiedź).
samthebest
2
@samthebest czy mógłbyś opracować? Jeśli to sprawdzisz KryoSerializationWrapper, przekonasz się, że Spark myśli, że tak jest java.io.Serializable- po prostu serializuje obiekt wewnętrznie za pomocą Kryo - szybciej, prościej. I nie sądzę, że dotyczy instancji statycznej - po prostu usuwa serializację wartości po wywołaniu value.apply ().
Nilesh,
8

I w obliczu podobnego problemu, a co rozumiem z odpowiedzią Grega za to

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

twoja metoda doIT próbuje serializować metodę SomeFunc (_) , ale ponieważ metody nie można serializować, próbuje serializować testy klas których ponownie nie można serializować.

Więc zrobić swoją pracę kodu, należy zdefiniować someFunc wewnątrz Doit metody. Na przykład:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

A jeśli pojawi się wiele funkcji, wszystkie te funkcje powinny być dostępne w kontekście nadrzędnym.

Tarang Bhalodia
źródło
7

Nie jestem do końca pewien, czy dotyczy to Scali, ale w Javie rozwiązałem ten problem NotSerializableException, refaktoryzując mój kod, aby zamknięcie nie uzyskało dostępu do finalpola, które nie jest serializowane .

Trebor Rude
źródło
napotykam ten sam problem w Javie, próbuję użyć klasy FileWriter z pakietu Java IO wewnątrz metody foreach RDD. Czy możesz dać mi znać, jak możemy to rozwiązać.
Shankar,
1
Cóż, @Shankar, jeśli FileWriterto finalpole klasy zewnętrznej, nie możesz tego zrobić. Ale FileWritermogą być zbudowane z a Stringlub a File, z których oba są Serializable. Zmodyfikuj kod, aby zbudować plik lokalny FileWriterna podstawie nazwy pliku z klasy zewnętrznej.
Trebor Rude,
0

Do Twojej wiadomości w Spark 2.4 prawdopodobnie napotkasz ten problem. Serializacja Kryo stała się lepsza, ale w wielu przypadkach nie można używać spark.kryo.unsafe = true lub naiwnego serializatora kryo.

W celu szybkiej naprawy spróbuj zmienić następujące ustawienia w konfiguracji Spark

spark.kryo.unsafe="false"

LUB

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Modyfikuję niestandardowe transformacje RDD, które napotykam lub osobiście piszę, używając jawnych zmiennych rozgłoszeniowych i wykorzystując nowy wbudowany interfejs twitter-chill, przekształcając je rdd.map(row =>w rdd.mapPartitions(partition => {funkcje.

Przykład

Stary (nie-świetny) sposób

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Alternatywny (lepszy) sposób

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Ten nowy sposób wywoła zmienną rozgłoszeniową tylko raz na partycję, co jest lepsze. Nadal będziesz musiał korzystać z serializacji Java, jeśli nie zarejestrujesz klas.

Kościół Gabe
źródło