Prowadzę pracę na Spark w trybie spekulacji. Mam około 500 zadań i około 500 skompresowanych plików o wielkości 1 GB gz. Ciągle dostaję się do każdej pracy, dla 1-2 zadań, dołączony błąd, w którym powtarza się potem dziesiątki razy (uniemożliwiając wykonanie zadania).
org.apache.spark.shuffle.MetadataFetchFailedException: Brak lokalizacji wyjściowej dla shuffle 0
Masz pomysł, jakie jest znaczenie problemu i jak go rozwiązać?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
apache-spark
dotan
źródło
źródło
LostExecutor
wiadomości INFO? Czy możesz sprawdzić stronę wykonawców interfejsu użytkownika sieci Web i zobaczyć, jak zachowują się wykonawcy, zwł. Pod względem GC?Odpowiedzi:
Zdarzyło mi się to, gdy dałem węzłowi roboczemu więcej pamięci niż ma. Ponieważ nie nastąpiła zamiana, iskra uległa awarii podczas próby przechowywania obiektów do tasowania bez pozostałej pamięci.
Rozwiązaniem było albo dodanie wymiany, albo skonfigurowanie pracownika / modułu wykonawczego tak, aby używał mniej pamięci, a ponadto wykorzystywał poziom pamięci MEMORY_AND_DISK przez kilka przypadków.
źródło
Wystąpił podobny błąd ze Sparkiem, ale nie jestem pewien, czy jest to związane z Twoim problemem.
Użyliśmy
JavaPairRDD.repartitionAndSortWithinPartitions
100 GB danych i nadal zawodziło podobnie jak Twoja aplikacja. Następnie przyjrzeliśmy się dziennikom Yarn w określonych węzłach i odkryliśmy, że mamy jakiś problem z brakiem pamięci, więc Yarn przerwał wykonanie. Nasze rozwiązanie było do zmiany / addspark.shuffle.memoryFraction 0
in.../spark/conf/spark-defaults.conf
. Pozwoliło nam to obsłużyć w ten sposób znacznie większą (ale niestety nie nieskończoną) ilość danych.źródło
Ten sam problem wystąpił w moim klastrze z 3 maszynami YARN. Ciągle zmieniałem pamięć RAM, ale problem nie ustąpił. W końcu zobaczyłem w dziennikach następujące komunikaty:
a potem była taka wiadomość:
Zmodyfikowałem właściwości w spark-defaults.conf w następujący sposób:
Otóż to! Moja praca zakończyła się sukcesem po tym.
źródło
spark.executor.heartbeatInterval should be significantly less than spark.network.timeout
. Dlatego ustawienie obu tych samych wartości może nie być najlepszym pomysłem.Dla mnie robiłem okna na dużych danych (około 50B wierszy) i otrzymałem ładunek łodzi
W moich dziennikach. Oczywiście 4096 może być małe przy takim rozmiarze danych ... to doprowadziło mnie do następującego JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
I ostatecznie do następujących dwóch opcji konfiguracyjnych:
spark.sql.windowExec.buffer.spill.threshold
spark.sql.windowExec.buffer.in.memory.threshold
Obie domyślne to 4096; Podniosłem je znacznie wyżej (2097152) i teraz wydaje się, że wszystko idzie dobrze. Nie jestem w 100% pewien, że to to samo, co poruszony tutaj problem, ale jest to kolejna rzecz do wypróbowania.
źródło
Rozwiązałem ten błąd, zwiększając przydzieloną pamięć w executorMemory i driverMemory. Możesz to zrobić w HUE wybierając program Spark, który powoduje problem, a we właściwościach -> Lista opcji możesz dodać coś takiego:
Oczywiście wartości parametrów będą się różnić w zależności od wielkości klastra i Twoich potrzeb.
źródło
w interfejsie Spark Web UI, jeśli są jakieś informacje, takie jak
Executors lost
, musisz sprawdzić dziennik przędzy, upewnić się, czy twój pojemnik został zabity.Jeśli kontener został zabity, to prawdopodobnie z powodu braku pamięci.
Jak znaleźć kluczowe informacje w dziennikach przędzy? Na przykład mogą pojawić się takie ostrzeżenia:
W takim przypadku sugeruje, że powinieneś zwiększyć
spark.yarn.executor.memoryOverhead
.źródło
W moim przypadku (samodzielny klaster) wyjątek został zgłoszony, ponieważ system plików niektórych slaveów Spark został wypełniony w 100%. Usunięcie wszystkiego z
spark/work
folderów niewolników rozwiązało problem.źródło
Mam ten sam problem, ale przeszukałem wiele odpowiedzi, które nie mogą rozwiązać mojego problemu. ostatecznie debuguję swój kod krok po kroku. Uważam, że problem spowodowany rozmiarem danych nie jest zrównoważony dla każdej partycji, co doprowadziło do
MetadataFetchFailedException
tego namap
etapie, a nie nareduce
etapie. po prostu zrób todf_rdd.repartition(nums)
wcześniejreduceByKey()
źródło