Dziwne zachowanie podczas wywoływania funkcji poza zamknięciem:
- kiedy funkcja jest w obiekcie, wszystko działa
- gdy funkcja jest w klasie, uzyskaj:
Nie można serializować zadania: java.io.NotSerializableException: testowanie
Problem polega na tym, że potrzebuję mojego kodu w klasie, a nie w obiekcie. Jakiś pomysł dlaczego tak się dzieje? Czy obiekt Scala jest serializowany (domyślnie?)?
To jest przykład działającego kodu:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
To jest niedziałający przykład:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
źródło
źródło
Odpowiedzi:
RDD rozszerzają interfejs Serializowalny , więc nie jest to przyczyną niepowodzenia Twojego zadania. Teraz nie oznacza to, że możesz serializować
RDD
z Spark i unikaćNotSerializableException
Spark to silnik przetwarzania rozproszonego, a jego główną abstrakcją jest odporny rozproszony zestaw danych ( RDD) ), który można postrzegać jako zbiór rozproszony. Zasadniczo elementy RDD są podzielone na węzły klastra, ale Spark wyodrębnia to od użytkownika, pozwalając użytkownikowi na interakcję z RDD (kolekcją) tak, jakby była lokalna.
Nie dostać się do zbyt wielu szczegółów, ale po uruchomieniu różne transformacje na RDD (
map
,flatMap
,filter
i inne), kod transformacja (zamknięcie) wynosi:Możesz oczywiście uruchomić to lokalnie (jak w twoim przykładzie), ale wszystkie te fazy (oprócz wysyłki przez sieć) nadal występują. [Pozwala to na wykrycie błędów nawet przed wdrożeniem do produkcji]
W drugim przypadku zdarza się, że wywołujesz metodę zdefiniowaną w klasie
testing
z wnętrza funkcji mapy. Spark widzi to i ponieważ metody nie mogą być serializowane same, Spark próbuje serializować całątesting
klasę, aby kod nadal działał, gdy zostanie wykonany w innej maszynie JVM. Masz dwie możliwości:Albo sprawisz, że testy klasowe będą serializowane, aby cała klasa mogła być serializowana przez Spark:
lub utworzysz
someFunc
funkcję zamiast metody (funkcje są obiektami w Scali), dzięki czemu Spark będzie mógł ją serializować:Podobny, ale nie ten sam problem z serializacją klas może Cię zainteresować i możesz go przeczytać w tej prezentacji Spark Summit 2013 .
Na marginesie, możesz przepisać
rddList.map(someFunc(_))
narddList.map(someFunc)
, są one dokładnie takie same. Zazwyczaj drugi jest preferowany, ponieważ jest mniej gadatliwy i czytelniejszy.EDYCJA (2015-03-15): SPARK-5307 wprowadził SerializationDebugger, a Spark 1.3.0 jest pierwszą wersją, która go używa. Dodaje ścieżkę serializacji do NotSerializableException . Po napotkaniu NotSerializableException debugger odwiedza wykres obiektu, aby znaleźć ścieżkę do obiektu, którego nie można zserializować, i konstruuje informacje, aby pomóc użytkownikowi znaleźć obiekt.
W przypadku OP to jest drukowane na standardowe wyjście:
źródło
val test = new Test with Serializable
Odpowiedź Gregi doskonale wyjaśnia, dlaczego oryginalny kod nie działa, i dwa sposoby rozwiązania problemu. Jednak to rozwiązanie nie jest zbyt elastyczne; rozważmy przypadek, w którym zamknięcie obejmuje wywołanie metody dla
Serializable
klasy innej niż klasa, nad którą nie masz kontroli. Nie można dodaćSerializable
znacznika do tej klasy ani zmienić implementacji bazowej, aby zmienić metodę na funkcję.Nilesh przedstawia świetne obejście tego problemu, ale rozwiązanie można uczynić bardziej zwięzłym i ogólnym:
Tego serializatora funkcji można następnie użyć do automatycznego zawijania zamknięć i wywołań metod:
Ta technika ma również tę zaletę, że nie wymaga dodatkowych zależności Shark w celu uzyskania dostępu
KryoSerializationWrapper
, ponieważ chłód Twittera jest już przyciągany przez rdzeń Sparkźródło
Pełna rozmowa w pełni wyjaśniająca problem, który proponuje świetny sposób zmiany paradygmatu, aby uniknąć problemów z serializacją: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
Najczęściej głosowana odpowiedź w zasadzie sugeruje odrzucenie całej funkcji językowej - która nie korzysta już z metod, a jedynie z funkcji. Rzeczywiście, należy unikać metod programowania funkcjonalnego w klasach, ale zamiana ich w funkcje nie rozwiązuje tutaj problemu projektowego (patrz powyższy link).
Jako szybką poprawkę w tej konkretnej sytuacji możesz po prostu użyć
@transient
adnotacji, aby powiedzieć, aby nie próbowała serializować wartości obrażającej (tutajSpark.ctx
jest klasa niestandardowa, a nie klasa Spark, która nazywa się po OP):Możesz także zrestrukturyzować kod, aby rddList mieszkał gdzie indziej, ale jest to również paskudne.
Przyszłość to prawdopodobnie zarodniki
W przyszłości Scala będzie zawierać te rzeczy zwane „zarodnikami”, które powinny pozwolić nam precyzyjnie kontrolować ziarno, co robi i nie do końca zostaje wciągnięte przez zamknięcie. Ponadto powinno to przekształcić wszystkie błędy przypadkowego wciągnięcia typów nie podlegających serializacji (lub dowolne niepożądane wartości) w błędy kompilacji, a nie teraz, co jest strasznymi wyjątkami / przeciekami pamięci.
http://docs.scala-lang.org/sips/pending/spores.html
Wskazówka na temat serializacji Kryo
Korzystając z Kyro, zrób to tak, aby rejestracja była konieczna, co oznacza, że zamiast błędów wycieku pojawi się błąd:
„W końcu wiem, że kryo ma kryo.setRegistrationOptional (prawda), ale mam bardzo trudny czas, aby dowiedzieć się, jak go używać. Gdy ta opcja jest włączona, wydaje się, że kryo generuje wyjątki, jeśli nie zarejestrowałem się klasy ”.
Strategia rejestracji zajęć w kryo
Oczywiście daje to tylko kontrolę na poziomie typu, a nie kontrolę na poziomie wartości.
... więcej pomysłów.
źródło
Rozwiązałem ten problem, stosując inne podejście. Musisz po prostu serializować obiekty przed przejściem przez zamknięcie, a następnie usuń serializację z pamięci. To podejście po prostu działa, nawet jeśli twoich klas nie można szeregować, ponieważ używa Kryo za kulisami. Wszystko czego potrzebujesz to trochę curry. ;)
Oto przykład tego, jak to zrobiłem:
Spraw, aby Blah był tak skomplikowany, jak chcesz, klasa, obiekt towarzyszący, klasy zagnieżdżone, odniesienia do wielu bibliotek stron trzecich.
KryoSerializationWrapper odnosi się do: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
źródło
KryoSerializationWrapper
, przekonasz się, że Spark myśli, że tak jestjava.io.Serializable
- po prostu serializuje obiekt wewnętrznie za pomocą Kryo - szybciej, prościej. I nie sądzę, że dotyczy instancji statycznej - po prostu usuwa serializację wartości po wywołaniu value.apply ().I w obliczu podobnego problemu, a co rozumiem z odpowiedzią Grega za to
twoja metoda doIT próbuje serializować metodę SomeFunc (_) , ale ponieważ metody nie można serializować, próbuje serializować testy klas których ponownie nie można serializować.
Więc zrobić swoją pracę kodu, należy zdefiniować someFunc wewnątrz Doit metody. Na przykład:
A jeśli pojawi się wiele funkcji, wszystkie te funkcje powinny być dostępne w kontekście nadrzędnym.
źródło
Nie jestem do końca pewien, czy dotyczy to Scali, ale w Javie rozwiązałem ten problem
NotSerializableException
, refaktoryzując mój kod, aby zamknięcie nie uzyskało dostępu dofinal
pola, które nie jest serializowane .źródło
FileWriter
tofinal
pole klasy zewnętrznej, nie możesz tego zrobić. AleFileWriter
mogą być zbudowane z aString
lub aFile
, z których oba sąSerializable
. Zmodyfikuj kod, aby zbudować plik lokalnyFileWriter
na podstawie nazwy pliku z klasy zewnętrznej.Do Twojej wiadomości w Spark 2.4 prawdopodobnie napotkasz ten problem. Serializacja Kryo stała się lepsza, ale w wielu przypadkach nie można używać spark.kryo.unsafe = true lub naiwnego serializatora kryo.
W celu szybkiej naprawy spróbuj zmienić następujące ustawienia w konfiguracji Spark
LUB
Modyfikuję niestandardowe transformacje RDD, które napotykam lub osobiście piszę, używając jawnych zmiennych rozgłoszeniowych i wykorzystując nowy wbudowany interfejs twitter-chill, przekształcając je
rdd.map(row =>
wrdd.mapPartitions(partition => {
funkcje.Przykład
Stary (nie-świetny) sposób
Alternatywny (lepszy) sposób
Ten nowy sposób wywoła zmienną rozgłoszeniową tylko raz na partycję, co jest lepsze. Nadal będziesz musiał korzystać z serializacji Java, jeśli nie zarejestrujesz klas.
źródło