Chciałbym przeczytać plik CSV w Spark i przekonwertować go na DataFrame i zapisać w HDFS z df.registerTempTable("table_name")
Próbowałem:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Błąd, który otrzymałem:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Jakie jest właściwe polecenie, aby załadować plik CSV jako DataFrame w Apache Spark?
Odpowiedzi:
Spark-csv jest częścią podstawowych funkcji platformy Spark i nie wymaga osobnej biblioteki. Możesz więc po prostu zrobić na przykład
W scali (działa to w przypadku każdej wzmianki o separatorze formatowania „,” dla csv, „\ t” dla tsv itp.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
źródło
Przeanalizuj plik CSV i załaduj jako DataFrame / DataSet za pomocą Spark 2.x
Najpierw zainicjalizuj
SparkSession
obiekt , który będzie domyślnie dostępny w powłokach jakospark
1. Zrób to w sposób programowy
Aktualizacja: Dodanie wszystkich opcji z tego miejsca na wypadek, gdyby łącze zostało zerwane w przyszłości
2. Możesz również zrobić to w SQL
Zależności :
Wersja Spark <2.0
Zależności:
źródło
spark-core_2.11
ispark-sql_2.11
od2.0.1
wersji jest w porządku. Jeśli to możliwe, dodaj komunikat o błędzie.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
jest pozostawienie off.format("csv")
i wymienić.load(...
z.csv(...
.option
Metoda należy do klasy DataFrameReader jako zwrócony przezread
metodę, gdzieload
icsv
metody zwracają dataframe więc nie może mieć opcje oznaczone na po wywołaniu. Ta odpowiedź jest dość dokładna, ale powinieneś podać link do dokumentacji, aby ludzie mogli zobaczyć wszystkie inne dostępne opcje CSV spark.apache.org/docs/latest/api/scala/ ... *): org.apache.spark.sql.DataFrameJest dla którego Hadoop to 2,6, a Spark to 1,6 i bez pakietu „databricks”.
źródło
W przypadku Spark 2.0 poniżej opisano, jak czytać CSV
źródło
spark.read.csv(path)
ispark.read.format("csv").load(path)
?W Javie 1.8 Ten fragment kodu doskonale działa do odczytu plików CSV
POM.xml
Jawa
źródło
Przetwarzanie pliku CSV wiąże się z wieloma wyzwaniami, sumuje się, jeśli rozmiar pliku jest większy, jeśli w wartościach kolumn znajdują się inne znaki niż angielski / ucieczka / separator / inne znaki, które mogą powodować błędy analizy.
Magia tkwi zatem w zastosowanych opcjach. Te, które działały dla mnie i mam nadzieję, że powinny obejmować większość przypadków skrajnych, są w kodzie poniżej:
Mam nadzieję, że to pomoże. Więcej informacji: Używanie PySpark 2 do czytania CSV z kodem źródłowym HTML
Uwaga: powyższy kod pochodzi z interfejsu API Spark 2, w którym interfejs API do odczytu plików CSV jest dostarczany w pakiecie z wbudowanymi pakietami platformy Spark do zainstalowania.
Uwaga: PySpark jest opakowaniem Pythona dla platformy Spark i ma ten sam interfejs API co Scala / Java.
źródło
Przykład Penny's Spark 2 to sposób na zrobienie tego w Spark2. Jest jeszcze jedna sztuczka: wygeneruj ten nagłówek, wykonując wstępne skanowanie danych, ustawiając opcję
inferSchema
natrue
Tutaj więc, zakładając, że
spark
jest to sesja iskrowa, którą skonfigurowałeś, jest operacja załadowania do pliku indeksu CSV wszystkich obrazów Landsat, które amazon hostuje na S3.Zła wiadomość jest taka: uruchamia to skanowanie pliku; dla czegoś dużego, takiego jak ten spakowany plik CSV o wielkości 20 + MB, który może zająć 30 sekund w przypadku połączenia długodystansowego. Miej to na uwadze: lepiej ręcznie zakodować schemat, gdy już się pojawi.
(fragment kodu Apache Software License 2.0 licencjonowany w celu uniknięcia wszelkich niejasności; coś, co zrobiłem jako test demonstracyjny / test integracji integracji S3)
źródło
W przypadku, gdy budujesz słoik w wersji 2.11 i Apache 2.0 lub nowszej.
Nie ma potrzeby tworzenia obiektu
sqlContext
lubsparkContext
. TylkoSparkSession
przedmiot wystarcza na wszystkie potrzeby.Poniżej znajduje się mycode, który działa dobrze:
Jeśli pracujesz w klastrze, po prostu zmień
.master("local")
na.master("yarn")
podczas definiowaniasparkBuilder
obiektuOmówiono to w Spark Doc: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
źródło
Dodaj następujące zależności Spark do pliku POM:
// Konfiguracja Spark:
val spark = SparkSession.builder (). master ("local"). appName ("Przykładowa aplikacja"). getOrCreate ()
// Przeczytaj plik csv:
val df = spark.read.option ("nagłówek", "prawda"). csv ("ŚCIEŻKA_PLIKU")
// Wyświetl wyjście
df.show ()
źródło
Aby odczytać ze ścieżki względnej w systemie, użyj metody System.getProperty, aby uzyskać bieżący katalog, a następnie użyj do załadowania pliku przy użyciu ścieżki względnej.
iskra: 2.4.4 scala: 2.11.12
źródło
W przypadku platformy Spark 2.4+, jeśli chcesz załadować plik csv z katalogu lokalnego, możesz użyć 2 sesji i załadować go do gałęzi. Pierwsza sesja powinna zostać utworzona z master () config jako „local [*]”, a druga sesja z włączonymi „yarn” i Hive.
Poniższy zadziałał dla mnie.
Po uruchomieniu z
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
nim poszło dobrze i stworzyłem stół w ulu.źródło
Domyślnym formatem pliku jest Parquet z spark.read .. i odczytem pliku csv, dlatego otrzymujesz wyjątek. Określ format CSV za pomocą interfejsu API, którego próbujesz użyć
źródło
Spróbuj tego, jeśli używasz Spark 2.0+
Uwaga: - to działa dla każdego rozdzielanego pliku. Po prostu użyj opcji („separator”,), aby zmienić wartość.
Mam nadzieję, że to jest pomocne.
źródło
Dzięki wbudowanemu Spark CSV możesz to łatwo zrobić dzięki nowemu obiektowi SparkSession dla Spark> 2.0.
Istnieje wiele opcji, które możesz ustawić.
header
: czy twój plik zawiera linię nagłówka na górzeinferSchema
: czy chcesz automatycznie wywnioskować schemat, czy nie. Domyślnie jesttrue
. Zawsze wolę udostępniać schemat, aby zapewnić odpowiednie typy danych.mode
: tryb analizy, PERMISSIVE, DROPMALFORMED lub FAILFASTdelimiter
: aby określić separator, domyślnie jest to przecinek (',')źródło