Używam https://github.com/databricks/spark-csv , próbuję napisać pojedynczy plik CSV, ale nie mogę, tworzy folder.
Potrzebujesz funkcji Scala, która pobierze parametry takie jak ścieżka i nazwa pliku i zapisze ten plik CSV.
źródło
Używam https://github.com/databricks/spark-csv , próbuję napisać pojedynczy plik CSV, ale nie mogę, tworzy folder.
Potrzebujesz funkcji Scala, która pobierze parametry takie jak ścieżka i nazwa pliku i zapisze ten plik CSV.
Tworzy folder z wieloma plikami, ponieważ każda partycja jest zapisywana indywidualnie. Jeśli potrzebujesz pojedynczego pliku wyjściowego (nadal w folderze), możesz repartition
(preferowane, jeśli dane wyjściowe są duże, ale wymagają przetasowania):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
lub coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
ramka danych przed zapisaniem:
Wszystkie dane zostaną zapisane mydata.csv/part-00000
. Zanim skorzystasz z tej opcji, upewnij się, że rozumiesz, co się dzieje i jaki jest koszt przesłania wszystkich danych do jednego pracownika . Jeśli używasz rozproszonego systemu plików z replikacją, dane będą przesyłane wiele razy - najpierw pobierane do jednego pracownika, a następnie rozprowadzane w węzłach magazynowania.
Alternatywnie można zostawić swój kod, jak to jest i używać narzędzi ogólnego przeznaczenia jak cat
i HDFSgetmerge
po prostu połączyć wszystkie części później.
.coalesce(1)
, że jakiś wyjątek FileNotFoundException w katalogu _temporary. Wciąż jest to błąd w Spark: Issues.apache.org/jira/browse/SPARK-2984coalesce(1)
ponieważ jest bardzo drogi i zwykle niepraktyczny.Jeśli używasz Sparka z HDFS, rozwiązałem problem, pisząc pliki csv normalnie i wykorzystując HDFS do scalania. Robię to bezpośrednio w Spark (1.6):
Nie pamiętam, gdzie nauczyłem się tej sztuczki, ale może ci się to udać.
źródło
Mogę się trochę spóźnić do gry tutaj, ale używając
coalesce(1)
lubrepartition(1)
może działać dla małych zestawów danych, ale duże zestawy danych byłyby wrzucane do jednej partycji w jednym węźle. Może to spowodować błędy OOM lub w najlepszym przypadku powolne przetwarzanie.Zdecydowanie sugerowałbym użycie
FileUtil.copyMerge()
funkcji z interfejsu API Hadoop. Spowoduje to scalenie wyników w jeden plik.EDYCJA - efektywnie przenosi dane do sterownika, a nie do węzła wykonawczego.
Coalesce()
byłoby dobrze, gdyby pojedynczy wykonawca miał więcej pamięci RAM do wykorzystania niż sterownik.EDYCJA 2 :
copyMerge()
jest usuwana w Hadoop 3.0. Zobacz następujący artykuł o przepełnieniu stosu, aby uzyskać więcej informacji na temat pracy z najnowszą wersją: Jak wykonać CopyMerge w Hadoop 3.0?źródło
Jeśli korzystasz z Databricks i możesz zmieścić wszystkie dane w pamięci RAM jednego pracownika (a tym samym używać
.coalesce(1)
), możesz użyć dbfs, aby znaleźć i przenieść wynikowy plik CSV:Jeśli twój plik nie mieści się w pamięci RAM w pliku roboczym , możesz rozważyć sugestię chaotic3quilibrium, aby użyć FileUtils.copyMerge () . Nie zrobiłem tego i jeszcze nie wiem, czy jest to możliwe, czy nie, np. Na S3.
Ta odpowiedź jest oparta na poprzednich odpowiedziach na to pytanie, a także na moich własnych testach dostarczonego fragmentu kodu. Pierwotnie wysłałem go do Databricks i ponownie publikuję tutaj.
Najlepsza dokumentacja dotycząca opcji rekurencyjnej rm dbfs, jaką znalazłem, znajduje się na forum Databricks .
źródło
Rozwiązanie, które działa dla S3 zmodyfikowanego przez Minkymorgan.
Po prostu podaj tymczasową ścieżkę katalogu z partycjami (z inną nazwą niż ścieżka końcowa) jako
srcPath
ostateczny plik csv / txt jakodestPath
Określ również,deleteSource
jeśli chcesz usunąć oryginalny katalog.źródło
df.write()
Interfejs API iskry utworzy wiele plików części w podanej ścieżce ... aby wymusić zapisanie przez iskrę tylko jednego pliku częścidf.coalesce(1).write.csv(...)
zamiastdf.repartition(1).write.csv(...)
łączenia jest wąską transformacją, podczas gdy repartition to szeroka transformacja, patrz Spark - repartition () vs coalesce ()utworzy folder w podanej ścieżce do jednego
part-0001-...-c000.csv
plikumieć przyjazną dla użytkownika nazwę pliku
źródło
df.toPandas().to_csv(path)
tego, aby zapisać pojedynczy plik csv z preferowaną nazwą plikurepartycjonowanie / łączenie na 1 partycję przed zapisaniem (nadal można uzyskać folder, ale miałby w nim jeden plik części)
źródło
możesz użyć
rdd.coalesce(1, true).saveAsTextFile(path)
będzie przechowywać dane jako pojedynczy plik w path / part-00000
źródło
Rozwiązałem używając poniższego podejścia (zmiana nazwy pliku hdfs): -
Krok 1: - (Crate Data Frame i zapis na HDFS)
Krok 2: - (Utwórz konfigurację Hadoop)
Krok 3: - (Uzyskaj ścieżkę w ścieżce folderu hdfs)
Step4: - (Pobierz nazwy plików iskier z folderu hdfs)
setp5: - (utwórz listę mutowalną scala, aby zapisać wszystkie nazwy plików i dodać je do listy)
Krok 6: - (filtruj kolejność plików _SUCESS z listy nazw plików)
krok 7: - (przekonwertuj listę scala na ciąg i dodaj żądaną nazwę pliku do ciągu folderu hdfs, a następnie zastosuj zmianę nazwy)
źródło
Używam tego w Pythonie, aby uzyskać pojedynczy plik:
źródło
Ta odpowiedź rozszerza zaakceptowaną odpowiedź, daje więcej kontekstu i zawiera fragmenty kodu, które można uruchomić w powłoce Spark na komputerze.
Więcej kontekstu na temat zaakceptowanej odpowiedzi
Zaakceptowana odpowiedź może sprawiać wrażenie, że przykładowy kod wysyła pojedynczy
mydata.csv
plik, a tak nie jest. Pokażmy:Oto, co zostanie wyświetlone:
NB
mydata.csv
to folder w zaakceptowanej odpowiedzi - to nie jest plik!Jak wyprowadzić pojedynczy plik o określonej nazwie
Możemy użyć spark-daria do wypisania pojedynczego
mydata.csv
pliku.Spowoduje to wyświetlenie pliku w następujący sposób:
Ścieżki S3
DariaWriters.writeSingleFile
Aby użyć tej metody w S3, musisz przekazać ścieżki s3a :Więcej informacji znajdziesz tutaj .
Unikanie copyMerge
copyMerge zostało usunięte z Hadoop 3.
DariaWriters.writeSingleFile
Implementacja używafs.rename
, jak opisano tutaj . Spark 3 nadal korzystał z Hadoop 2 , więc implementacje copyMerge będą działać w 2020 roku. Nie jestem pewien, kiedy Spark dokona aktualizacji do Hadoop 3, ale lepiej unikać podejścia copyMerge, które spowoduje uszkodzenie kodu, gdy Spark uaktualni Hadoop.Kod źródłowy
Poszukaj
DariaWriters
obiektu w kodzie źródłowym spark-daria, jeśli chcesz sprawdzić implementację.Wdrożenie PySpark
Zapisanie pojedynczego pliku za pomocą PySpark jest łatwiejsze, ponieważ można przekonwertować DataFrame na Pandas DataFrame, która jest domyślnie zapisywana jako pojedynczy plik.
Ograniczenia
DariaWriters.writeSingleFile
Podejście Scala idf.toPandas()
Python zbliżyć tylko pracę dla małych zbiorów danych. Ogromnych zbiorów danych nie można zapisać jako pojedynczych plików. Zapisywanie danych jako pojedynczego pliku nie jest optymalne z punktu widzenia wydajności, ponieważ danych nie można zapisywać równolegle.źródło
wykorzystując Listbuffer możemy zapisać dane do jednego pliku:
źródło
Jest jeszcze jeden sposób korzystania z Javy
źródło