Dlaczego zadania Spark kończą się niepowodzeniem z org.apache.spark.shuffle.MetadataFetchFailedException: brak lokalizacji wyjściowej dla shuffle 0 w trybie spekulacji?

85

Prowadzę pracę na Spark w trybie spekulacji. Mam około 500 zadań i około 500 skompresowanych plików o wielkości 1 GB gz. Ciągle dostaję się do każdej pracy, dla 1-2 zadań, dołączony błąd, w którym powtarza się potem dziesiątki razy (uniemożliwiając wykonanie zadania).

org.apache.spark.shuffle.MetadataFetchFailedException: Brak lokalizacji wyjściowej dla shuffle 0

Masz pomysł, jakie jest znaczenie problemu i jak go rozwiązać?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
dotan
źródło
1
Widziałeś jakieś LostExecutorwiadomości INFO? Czy możesz sprawdzić stronę wykonawców interfejsu użytkownika sieci Web i zobaczyć, jak zachowują się wykonawcy, zwł. Pod względem GC?
Jacek Laskowski

Odpowiedzi:

50

Zdarzyło mi się to, gdy dałem węzłowi roboczemu więcej pamięci niż ma. Ponieważ nie nastąpiła zamiana, iskra uległa awarii podczas próby przechowywania obiektów do tasowania bez pozostałej pamięci.

Rozwiązaniem było albo dodanie wymiany, albo skonfigurowanie pracownika / modułu wykonawczego tak, aby używał mniej pamięci, a ponadto wykorzystywał poziom pamięci MEMORY_AND_DISK przez kilka przypadków.

Joren Van Severen
źródło
3
Jeśli masz zasób w węźle (pamięć), możesz spróbować zwiększyć pamięć modułu wykonawczego Spark. Spróbuję tego najpierw, jeśli obawiasz się również o wydajność.
nir
14
Cześć @Joren, to nie są zawody. Problem z OP polega na tym, że moduł wykonawczy nie ma wystarczającej ilości pamięci do przechowywania danych wyjściowych w trybie odtwarzania losowego. To, co zadziałało, to nie zmniejszenie pamięci executora, ale użycie poziomu pamięci MEMORY_AND_DISK, który eliminuje ograniczenie pamięci executora. Również OP nie mówi o tym, ile ma zasobów na executora.
nir
Mam ten sam problem i próbowałem metod takich jak zwiększenie pamięci modułu wykonawczego, zwiększenie ilości repartycji, zwalnianie większej ilości pamięci fizycznej. Czasami to działało, a czasami nie. Zauważyłem, że dzieje się to tylko w fazie odczytu losowego i chciałbym zapytać, gdzie mogę ustawić StorageLevel?
Lhfcws
Zoptymalizowałem strukturę danych i naprawiłem to. Właśnie zmieniłem HashMap na bajt [], który został zserializowany przez protostuff
Lhfcws
1
Spróbuj zmienić wartości spark.driver.overhead.memory i spark.executor.overhead.memory na wartość większą niż 384 (domyślnie) i powinno działać. Możesz użyć 1024 MB lub 2048 MB.
rahul gulati
14

Wystąpił podobny błąd ze Sparkiem, ale nie jestem pewien, czy jest to związane z Twoim problemem.

Użyliśmy JavaPairRDD.repartitionAndSortWithinPartitions100 GB danych i nadal zawodziło podobnie jak Twoja aplikacja. Następnie przyjrzeliśmy się dziennikom Yarn w określonych węzłach i odkryliśmy, że mamy jakiś problem z brakiem pamięci, więc Yarn przerwał wykonanie. Nasze rozwiązanie było do zmiany / add spark.shuffle.memoryFraction 0in .../spark/conf/spark-defaults.conf. Pozwoliło nam to obsłużyć w ten sposób znacznie większą (ale niestety nie nieskończoną) ilość danych.

Spoza listy
źródło
Czy to naprawdę „0”, czy to był błąd podczas pisania? Jaka logika się za tym kryje, zmuszając go do trwałego rozlania się na dysk?
Virgil
@Virgil Yes. Zrobiliśmy kilka testów. Im bliżej zera byliśmy, tym większa była możliwa do przetworzenia ilość. Cena wynosiła 20% czasu.
Notinlist
Co ciekawe, zredukowałem również spark.shuffle.memoryFraction do zera, ale otrzymałem więcej błędów z rzędu. (Mianowicie: MetadataFetchFailedException i FetchFailedException sporadycznie) Powinien stać się błędem / problemem, jeśli „all-spill” ma mniej błędów niż „Part-spill”.
tribbloid
11

Ten sam problem wystąpił w moim klastrze z 3 maszynami YARN. Ciągle zmieniałem pamięć RAM, ale problem nie ustąpił. W końcu zobaczyłem w dziennikach następujące komunikaty:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

a potem była taka wiadomość:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

Zmodyfikowałem właściwości w spark-defaults.conf w następujący sposób:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

Otóż ​​to! Moja praca zakończyła się sukcesem po tym.

xplorerdev
źródło
W docs zapłonowych, to powiedział: spark.executor.heartbeatInterval should be significantly less than spark.network.timeout. Dlatego ustawienie obu tych samych wartości może nie być najlepszym pomysłem.
Bitswazsky
2

Dla mnie robiłem okna na dużych danych (około 50B wierszy) i otrzymałem ładunek łodzi

ExternalAppendOnlyUnsafeRowArray:54 - Osiągnięto próg wycieku 4096 wierszy, przechodząc na org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

W moich dziennikach. Oczywiście 4096 może być małe przy takim rozmiarze danych ... to doprowadziło mnie do następującego JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

I ostatecznie do następujących dwóch opcji konfiguracyjnych:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

Obie domyślne to 4096; Podniosłem je znacznie wyżej (2097152) i teraz wydaje się, że wszystko idzie dobrze. Nie jestem w 100% pewien, że to to samo, co poruszony tutaj problem, ale jest to kolejna rzecz do wypróbowania.

MichaelChirico
źródło
1

Rozwiązałem ten błąd, zwiększając przydzieloną pamięć w executorMemory i driverMemory. Możesz to zrobić w HUE wybierając program Spark, który powoduje problem, a we właściwościach -> Lista opcji możesz dodać coś takiego:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

Oczywiście wartości parametrów będą się różnić w zależności od wielkości klastra i Twoich potrzeb.

Ignacio Alorre
źródło
1

w interfejsie Spark Web UI, jeśli są jakieś informacje, takie jak Executors lost, musisz sprawdzić dziennik przędzy, upewnić się, czy twój pojemnik został zabity.

Jeśli kontener został zabity, to prawdopodobnie z powodu braku pamięci.

Jak znaleźć kluczowe informacje w dziennikach przędzy? Na przykład mogą pojawić się takie ostrzeżenia:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

W takim przypadku sugeruje, że powinieneś zwiększyć spark.yarn.executor.memoryOverhead.

DennisLi
źródło
0

W moim przypadku (samodzielny klaster) wyjątek został zgłoszony, ponieważ system plików niektórych slaveów Spark został wypełniony w 100%. Usunięcie wszystkiego z spark/workfolderów niewolników rozwiązało problem.

i000174
źródło
0

Mam ten sam problem, ale przeszukałem wiele odpowiedzi, które nie mogą rozwiązać mojego problemu. ostatecznie debuguję swój kod krok po kroku. Uważam, że problem spowodowany rozmiarem danych nie jest zrównoważony dla każdej partycji, co doprowadziło do MetadataFetchFailedExceptiontego na mapetapie, a nie na reduceetapie. po prostu zrób to df_rdd.repartition(nums)wcześniejreduceByKey()

Pies
źródło