Jestem nowy w Spark i próbuję odczytać dane CSV z pliku za pomocą Spark. Oto co robię:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Spodziewałbym się, że to wywołanie da mi listę dwóch pierwszych kolumn mojego pliku, ale otrzymuję ten błąd:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
chociaż mój plik CSV ma więcej niż jedną kolumnę.
python
csv
apache-spark
pyspark
Kernael
źródło
źródło
csv
biblioteki do obsługi wszystkich znaków ucieczki, ponieważ zwykłe dzielenie przecinkiem nie zadziała, jeśli, powiedzmy, w wartościach znajdują się przecinki.","
.Spark 2.0.0+
Możesz bezpośrednio użyć wbudowanego źródła danych csv:
lub
bez uwzględniania zależności zewnętrznych.
Spark <2.0.0 :
Zamiast ręcznego parsowania, co w ogólnym przypadku wcale nie jest trywialne, radziłbym
spark-csv
:Upewnij się, że Spark CSV jest wliczone w ścieżce (
--packages
,--jars
,--driver-class-path
)I załaduj swoje dane w następujący sposób:
Obsługuje ładowanie, wnioskowanie o schemacie, porzucanie źle sformułowanych wierszy i nie wymaga przekazywania danych z języka Python do maszyny JVM.
Uwaga :
Jeśli znasz schemat, lepiej unikać wnioskowania o schemacie i przekazać go do
DataFrameReader
. Zakładając, że masz trzy kolumny - integer, double i string:źródło
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(upewnij się, że zmieniłeś wersje databricks / spark na te, które masz zainstalowane).źródło
I jeszcze jedna opcja polegająca na odczytaniu pliku CSV za pomocą Pandas, a następnie zaimportowaniu Pandas DataFrame do Spark.
Na przykład:
źródło
Zwykłe dzielenie przecinkiem spowoduje również podzielenie przecinków, które znajdują się w polach (np.
a,b,"1,2,3",c
), Więc nie jest zalecane. Odpowiedź zero323 jest dobra, jeśli chcesz używać API DataFrames, ale jeśli chcesz trzymać się podstawowego Spark, możesz przeanalizować csv w podstawowym Pythonie za pomocą modułu csv :EDYCJA: Jak wspomniał @muon w komentarzach, potraktuje to nagłówek jak każdy inny wiersz, więc musisz go wyodrębnić ręcznie. Na przykład
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(pamiętaj, aby nie modyfikować,header
zanim filtr oceni). Ale w tym momencie prawdopodobnie lepiej będzie, jeśli użyjesz wbudowanego parsera csv.źródło
StringIO
.csv
może używać dowolnej iterowalnej b)__next__
nie powinno być używane bezpośrednio i zakończy się niepowodzeniem w pustym wierszu. Spójrz na flatMap c) Byłoby o wiele bardziej efektywne w użyciumapPartitions
zamiast inicjowania czytelnika na każdej linii :)rdd.mapPartitions(lambda x: csv.reader(x))
działa, gdyrdd.map(lambda x: csv.reader(x))
zgłasza błąd? Spodziewałem się, że obaj rzucą to samoTypeError: can't pickle _csv.reader objects
. Wydaje się również, żemapPartitions
automatycznie wywołuje jakiś odpowiednik "readlines" nacsv.reader
obiekcie, gdzie zmap
, musiałem wywołać__next__
jawnie, aby uzyskać listy zcsv.reader
. 2) GdzieflatMap
wchodzi? Samo dzwonieniemapPartitions
zadziałało dla mnie.rdd.mapPartitions(lambda x: csv.reader(x))
działa, ponieważmapPartitions
oczekujeIterable
obiektu. Jeśli chcesz być wyraźny, możesz użyć wyrażenia ze zrozumieniem lub generatora.map
sam nie działa, ponieważ nie wykonuje iteracji po obiekcie. Stąd moja propozycja użycia,flatMap(lambda x: csv.reader([x]))
która będzie iterować po czytelniku. Ale tutajmapPartitions
jest znacznie lepiej.To jest w PYSPARKU
Następnie możesz sprawdzić
źródło
Jeśli chcesz załadować csv jako ramkę danych, możesz wykonać następujące czynności:
U mnie to działało dobrze.
źródło
Jest to zgodne z tym, co początkowo sugerował JP Mercier na temat używania Pand, ale z dużą modyfikacją: jeśli wczytujesz dane do Pand w kawałkach, powinno to być bardziej plastyczne. Oznacza to, że możesz przeanalizować znacznie większy plik, niż Pandy może obsłużyć jako pojedynczy element i przekazać go do Spark w mniejszych rozmiarach. (To również odpowiada na komentarz dotyczący tego, dlaczego ktoś chciałby używać Sparka, skoro i tak mogą załadować wszystko do Pand).
źródło
Teraz jest też inna opcja dla dowolnego ogólnego pliku csv: https://github.com/seahboonsiew/pyspark-csv w następujący sposób:
Załóżmy, że mamy następujący kontekst
Najpierw roześlij pyspark-csv.py do programów wykonawczych za pomocą SparkContext
Odczytaj dane CSV za pośrednictwem SparkContext i przekonwertuj je na DataFrame
źródło
Jeśli twoje dane csv nie zawierają znaków nowej linii w żadnym z pól, możesz załadować swoje dane
textFile()
i przeanalizować jeźródło
Jeśli masz jeden lub więcej wierszy z mniejszą lub większą liczbą kolumn niż 2 w zbiorze danych, może wystąpić ten błąd.
Jestem też nowy w Pyspark i próbuję odczytać plik CSV. Poniższy kod zadziałał dla mnie:
W tym kodzie używam zestawu danych z kaggle, łącze to: https://www.kaggle.com/carrie1/ecommerce-data
1. Bez wspominania o schemacie:
Teraz sprawdź kolumny: sdfData.columns
Wynik będzie:
Sprawdź typ danych dla każdej kolumny:
To da ramkę danych ze wszystkimi kolumnami z typem danych jako StringType
2. Ze schematem: Jeśli znasz schemat lub chcesz zmienić typ danych dowolnej kolumny w powyższej tabeli, użyj tego (powiedzmy, że mam następujące kolumny i chcę, aby były w określonym typie danych dla każdej z nich)
Teraz sprawdź schemat dla typu danych każdej kolumny:
Edytowano: możemy również użyć następującego wiersza kodu bez wyraźnego wspominania o schemacie:
Wynik to:
Wynik będzie wyglądał następująco:
źródło
Podczas korzystania
spark.read.csv
uważam, że korzystając z opcjiescape='"'
imultiLine=True
zapewniam najbardziej spójne rozwiązanie do standardu CSV , az mojego doświadczenia wynika, że najlepiej działa z plikami CSV wyeksportowanymi z Arkuszy Google.To jest,
źródło
import pyspark as spark
jestspark
jest już zainicjowany. W skrypcie przesłanym przezspark-submit
możesz utworzyć jego wystąpienie jakofrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.