Jak nadpisać katalog wyjściowy w Spark

108

Mam aplikację do przesyłania strumieniowego iskier, która tworzy zestaw danych dla każdej minuty. Potrzebuję zapisać / nadpisać wyniki przetwarzanych danych.

Kiedy próbowałem nadpisać zbiór danych org.apache.hadoop.mapred.FileAlreadyExistsException zatrzymuje wykonanie.

Ustawiłem właściwość Spark set("spark.files.overwrite","true"), ale nie mam szczęścia.

Jak nadpisać lub wstępnie usunąć pliki ze Spark?

Vijay Innamuri
źródło
1
Tak, to jest do bani, prawda, uważam to za regresję do 0.9.0. Proszę przyjąć moją odpowiedź :)
samthebest
set("spark.files.overwrite","true")działa tylko dla plików dodanych przeztspark.addFile()
aiman

Odpowiedzi:

107

UPDATE: Zaproponuj użycie Dataframesplus coś takiego jak ... .write.mode(SaveMode.Overwrite) ....

Poręczny alfons:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

W przypadku starszych wersji spróbuj

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

W wersji 1.1.0 możesz ustawić ustawienia conf za pomocą skryptu spark-submit z flagą --conf.

OSTRZEŻENIE (starsze wersje): Według @piggybox w Sparku jest błąd polegający na tym, że nadpisuje tylko pliki, które musi zapisać part-, wszystkie inne pliki pozostaną nieusunięte.

samthebest
źródło
30
Za Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
W przypadku Spark SQL masz opcje definiowania SaveMode dla Core Spark, których nie masz. Naprawdę chciałbym mieć taką funkcję do saveAsTextFile i innych przekształceń
Murtaza Kanchwala
3
Ukryty problem: w porównaniu do rozwiązania @ pzecevic polegającego na wymazaniu całego folderu przez HDFS, w tym podejściu Spark nadpisze tylko pliki części o tej samej nazwie w folderze wyjściowym. Działa to przez większość czasu, ale jeśli w folderze znajduje się coś innego, na przykład dodatkowe pliki części z innego zadania Spark / Hadoop, nie spowoduje to zastąpienia tych plików.
piggybox
6
Możesz także użyć df.write.mode(mode: String).parquet(path)Where mode: String może być: "overwrite", "append", "ignore", "error".
żyto
1
@avocado Tak myślę, interfejsy Spark API stają się coraz gorsze z każdym wydaniem: P
samthebest
27

Dokumentacja parametru spark.files.overwritemówi tak: „Czy nadpisywać pliki dodane za pośrednictwem, SparkContext.addFile()gdy plik docelowy istnieje, a jego zawartość nie jest zgodna z zawartością źródła”. Nie ma więc żadnego wpływu na metodę saveAsTextFiles.

Możesz to zrobić przed zapisaniem pliku:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Jak wyjaśniono tutaj: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic
źródło
29
a co z pysparkiem?
javadba
Następną odpowiedzią na użycie opcji „write.mode (SaveMode.Overwrite)” jest droga do zrobienia
YaOg
hdfs może usuwać nowe pliki w miarę ich pojawiania się, ponieważ nadal usuwa stare.
Jake
25

Z dokumentacji pyspark.sql.DataFrame.save (obecnie 1.3.1) można określić mode='overwrite'podczas zapisywania DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Sprawdziłem, że spowoduje to nawet usunięcie pozostałych plików partycji. Więc jeśli pierwotnie powiedziałeś 10 partycji / plików, ale potem nadpisałeś folder ramką DataFrame, która miała tylko 6 partycji, wynikowy folder będzie miał 6 partycji / plików.

Zobacz dokumentację Spark SQL, aby uzyskać więcej informacji na temat opcji trybu.

dnlbrky
źródło
2
Prawdziwe i pomocne, dzięki, ale rozwiązanie specyficzne dla DataFrame - spark.hadoop.validateOutputSpecsbędzie działać we wszystkich interfejsach API Spark.
samthebest
Z jakiegoś powodu spark.hadoop.validateOutputSpecsnie działało dla mnie na 1.3, ale tak.
Eric Walker
1
@samthebest Dzięki save(... , mode=trasie możesz nadpisać jeden zestaw plików, dołączyć inny itp. w tym samym kontekście Spark. Czy nie spark.hadoop.validateOutputSpecsograniczyłbyś się tylko do jednego trybu na kontekst?
dnlbrky
1
@dnlbrky OP nie zażądał dołączenia. Jak powiedziałem, prawda, przydatna, ale niepotrzebna. Gdyby PO zapytał „jak mam dołączyć”, można by udzielić szeregu odpowiedzi. Ale nie wchodźmy w to. Radzę również rozważyć użycie wersji Scala DataFrames, ponieważ ma ona bezpieczeństwo typów i więcej sprawdzania - na przykład jeśli miałeś literówkę w „nadpisaniu”, nie dowiedziałbyś się, dopóki DAG nie zostanie oceniony - co w zadaniu Big Data mogłoby być 2 godziny później !! Jeśli używasz wersji Scala, kompilator sprawdzi wszystko z góry! Całkiem fajne i bardzo ważne dla Big Data.
samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")działa, jeśli chcesz nadpisać plik parkietu za pomocą Pythona. To jest iskra 1.6.2. API może się różnić w późniejszych wersjach

akn
źródło
Tak, to działa świetnie dla moich wymagań (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
źródło
Tylko dla Spark 1, w najnowszej wersjidf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Ta przeciążona wersja funkcji zapisywania działa dla mnie:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

Powyższy przykład nadpisze istniejący folder. Savemode może również przyjąć te parametry ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Dołącz : tryb dołączania oznacza, że ​​podczas zapisywania ramki DataFrame do źródła danych, jeśli dane / tabela już istnieją, zawartość ramki DataFrame powinna zostać dołączona do istniejących danych.

ErrorIfExists : tryb ErrorIfExists oznacza, że ​​podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, oczekuje się zgłoszenia wyjątku.

Ignoruj : tryb Ignoruj ​​oznacza, że ​​podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, operacja składowania nie zapisuje zawartości DataFrame i nie zmienia istniejących danych.

Shay
źródło
1

Jeśli chcesz użyć własnego niestandardowego formatu wyjściowego, będziesz w stanie uzyskać pożądane zachowanie również z RDD.

Przyjrzyj się następującym klasom: FileOutputFormat , FileOutputCommitter

W formacie pliku wyjściowego masz metodę o nazwie checkOutputSpecs, która sprawdza, czy katalog wyjściowy istnieje. W FileOutputCommitter masz commitJob, który zwykle przesyła dane z katalogu tymczasowego do ostatecznego miejsca.

Nie byłem jeszcze w stanie tego zweryfikować (zrobiłbym to, gdy tylko mam kilka wolnych minut), ale teoretycznie: Jeśli rozszerzę FileOutputFormat i zastąpię checkOutputSpecs na metodę, która nie zgłasza wyjątku w katalogu, który już istnieje i dostosuję commitJob mojego niestandardowego committera wyjściowego, aby wykonać dowolną logikę, którą chcę (np. zastąpić niektóre pliki, dołączyć inne), niż mogę osiągnąć pożądane zachowanie również z RDD.

Format wyjściowy jest przekazywany do: saveAsNewAPIHadoopFile (jest to metoda wywołana również przez saveAsTextFile, aby faktycznie zapisać pliki). Committer wyjścia jest konfigurowany na poziomie aplikacji.

Michael Kopaniov
źródło
Unikałbym zbliżania się do podklasy FileOutputCommitter, jeśli możesz temu pomóc: to przerażający fragment kodu. Hadoop 3.0 dodaje punkt wtyczki, w którym FileOutputFormat może przyjmować różne implementacje refaktoryzowanej superklasy (PathOutputCommitter). S3 z Netflix będzie zapisywać w miejscu w drzewie podzielonym na partycje,
rozwiązując