Mam aplikację do przesyłania strumieniowego iskier, która tworzy zestaw danych dla każdej minuty. Potrzebuję zapisać / nadpisać wyniki przetwarzanych danych.
Kiedy próbowałem nadpisać zbiór danych org.apache.hadoop.mapred.FileAlreadyExistsException zatrzymuje wykonanie.
Ustawiłem właściwość Spark set("spark.files.overwrite","true")
, ale nie mam szczęścia.
Jak nadpisać lub wstępnie usunąć pliki ze Spark?
apache-spark
Vijay Innamuri
źródło
źródło
set("spark.files.overwrite","true")
działa tylko dla plików dodanych przeztspark.addFile()
Odpowiedzi:
UPDATE: Zaproponuj użycie
Dataframes
plus coś takiego jak... .write.mode(SaveMode.Overwrite) ...
.Poręczny alfons:
W przypadku starszych wersji spróbuj
W wersji 1.1.0 możesz ustawić ustawienia conf za pomocą skryptu spark-submit z flagą --conf.
OSTRZEŻENIE (starsze wersje): Według @piggybox w Sparku jest błąd polegający na tym, że nadpisuje tylko pliki, które musi zapisać
part-
, wszystkie inne pliki pozostaną nieusunięte.źródło
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
Where mode: String może być: "overwrite", "append", "ignore", "error".ponieważ
df.save(path, source, mode)
jest przestarzałe, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )posługiwać się
df.write.format(source).mode("overwrite").save(path)
gdzie df.write to DataFrameWriter
„source” może być („com.databricks.spark.avro” | „parquet” | „json”)
źródło
source
może być równieżcsv
Dokumentacja parametru
spark.files.overwrite
mówi tak: „Czy nadpisywać pliki dodane za pośrednictwem,SparkContext.addFile()
gdy plik docelowy istnieje, a jego zawartość nie jest zgodna z zawartością źródła”. Nie ma więc żadnego wpływu na metodę saveAsTextFiles.Możesz to zrobić przed zapisaniem pliku:
Jak wyjaśniono tutaj: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
źródło
Z dokumentacji pyspark.sql.DataFrame.save (obecnie 1.3.1) można określić
mode='overwrite'
podczas zapisywania DataFrame:Sprawdziłem, że spowoduje to nawet usunięcie pozostałych plików partycji. Więc jeśli pierwotnie powiedziałeś 10 partycji / plików, ale potem nadpisałeś folder ramką DataFrame, która miała tylko 6 partycji, wynikowy folder będzie miał 6 partycji / plików.
Zobacz dokumentację Spark SQL, aby uzyskać więcej informacji na temat opcji trybu.
źródło
spark.hadoop.validateOutputSpecs
będzie działać we wszystkich interfejsach API Spark.spark.hadoop.validateOutputSpecs
nie działało dla mnie na 1.3, ale tak.save(... , mode=
trasie możesz nadpisać jeden zestaw plików, dołączyć inny itp. w tym samym kontekście Spark. Czy niespark.hadoop.validateOutputSpecs
ograniczyłbyś się tylko do jednego trybu na kontekst?df.write.mode('overwrite').parquet("/output/folder/path")
działa, jeśli chcesz nadpisać plik parkietu za pomocą Pythona. To jest iskra 1.6.2. API może się różnić w późniejszych wersjachźródło
źródło
df.write.mode(SaveMode.Overwrite)
Ta przeciążona wersja funkcji zapisywania działa dla mnie:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))
Powyższy przykład nadpisze istniejący folder. Savemode może również przyjąć te parametry ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Dołącz : tryb dołączania oznacza, że podczas zapisywania ramki DataFrame do źródła danych, jeśli dane / tabela już istnieją, zawartość ramki DataFrame powinna zostać dołączona do istniejących danych.
ErrorIfExists : tryb ErrorIfExists oznacza, że podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, oczekuje się zgłoszenia wyjątku.
Ignoruj : tryb Ignoruj oznacza, że podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, operacja składowania nie zapisuje zawartości DataFrame i nie zmienia istniejących danych.
źródło
Jeśli chcesz użyć własnego niestandardowego formatu wyjściowego, będziesz w stanie uzyskać pożądane zachowanie również z RDD.
Przyjrzyj się następującym klasom: FileOutputFormat , FileOutputCommitter
W formacie pliku wyjściowego masz metodę o nazwie checkOutputSpecs, która sprawdza, czy katalog wyjściowy istnieje. W FileOutputCommitter masz commitJob, który zwykle przesyła dane z katalogu tymczasowego do ostatecznego miejsca.
Nie byłem jeszcze w stanie tego zweryfikować (zrobiłbym to, gdy tylko mam kilka wolnych minut), ale teoretycznie: Jeśli rozszerzę FileOutputFormat i zastąpię checkOutputSpecs na metodę, która nie zgłasza wyjątku w katalogu, który już istnieje i dostosuję commitJob mojego niestandardowego committera wyjściowego, aby wykonać dowolną logikę, którą chcę (np. zastąpić niektóre pliki, dołączyć inne), niż mogę osiągnąć pożądane zachowanie również z RDD.
Format wyjściowy jest przekazywany do: saveAsNewAPIHadoopFile (jest to metoda wywołana również przez saveAsTextFile, aby faktycznie zapisać pliki). Committer wyjścia jest konfigurowany na poziomie aplikacji.
źródło