Przekonwertuj kolumnę Spark DataFrame na listę języka Python

106

Pracuję na ramce danych z dwiema kolumnami, mvv i count.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

Chciałbym uzyskać dwie listy zawierające wartości MVV i wartość licznika. Coś jak

mvv = [1,2,3,4]
count = [5,9,3,1]

Więc wypróbowałem następujący kod: Pierwsza linia powinna zwracać listę wierszy w języku Python. Chciałem zobaczyć pierwszą wartość:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

Ale pojawia się komunikat o błędzie z drugą linią:

AttributeError: getInt

a.moussa
źródło
Począwszy od Spark 2.3, kod ten jest najszybszy i najmniej prawdopodobne, aby spowodować OutOfMemory wyjątki: list(df.select('mvv').toPandas()['mvv']). Arrow został zintegrowany z PySpark, co toPandasznacznie przyspieszyło . Nie używaj innych podejść, jeśli używasz platformy Spark 2.3+. Zobacz moją odpowiedź, aby uzyskać więcej szczegółów dotyczących testów porównawczych.
Zasilanie

Odpowiedzi:

146

Zobacz, dlaczego ten sposób, który robisz, nie działa. Po pierwsze, próbujesz uzyskać liczbę całkowitą z typu wiersza , wynik twojej kolekcji jest taki:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

Jeśli weźmiesz coś takiego:

>>> firstvalue = mvv_list[0].mvv
Out: 1

Otrzymasz mvvwartość. Jeśli chcesz uzyskać wszystkie informacje o tablicy, możesz wziąć coś takiego:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

Ale jeśli spróbujesz tego samego dla drugiej kolumny, otrzymasz:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

Dzieje się tak, ponieważ countjest to metoda wbudowana. Kolumna ma taką samą nazwę jak count. Aby to zrobić, zmień nazwę kolumny countna _count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

Ale to obejście nie jest potrzebne, ponieważ możesz uzyskać dostęp do kolumny za pomocą składni słownika:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

I w końcu zadziała!

Thiago Baldim
źródło
działa świetnie dla pierwszej kolumny, ale nie działa dla liczby kolumn, o której myślę, ponieważ (funkcja count of spark)
a.moussa
Czy możesz dodać, co robisz z hrabią? Dodaj tutaj w komentarzach.
Thiago Baldim
dziękuję za odpowiedź Więc ta linia działa mvv_list = [int (i.mvv) for i in mvv_count.select ('mvv'). collect ()], ale nie ta count_list = [int (i.count) for i in mvv_count .select ('count'). collect ()] zwraca nieprawidłową składnię
a.moussa
Nie trzeba dodawać tego select('count')zastosowania w ten sposób: count_list = [int(i.count) for i in mvv_list.collect()]dodam przykład do odpowiedzi.
Thiago Baldim
1
@ a.moussa [i.['count'] for i in mvv_list.collect()]działa, aby jawnie używać kolumny o nazwie „count”, a nie countfunkcji
user989762
108

Podążanie za jedną linijką daje listę, którą chcesz.

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
Neo
źródło
3
Jeśli chodzi o wydajność, to rozwiązanie jest znacznie szybsze niż twoje rozwiązanie mvv_list = [int (i.mvv) for i in mvv_count.select ('mvv'). Collect ()]
Chanaka Fernando
To zdecydowanie najlepsze rozwiązanie, jakie widziałem. Dzięki.
hui chen
22

To da ci wszystkie elementy w postaci listy.

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)
Muhammad Raihan Muhaimin
źródło
1
To najszybsze i najbardziej wydajne rozwiązanie dla platformy Spark 2.3+. Zobacz wyniki benchmarkingu w mojej odpowiedzi.
Zasilanie
17

Poniższy kod pomoże ci

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
Itachi
źródło
3
To powinna być akceptowana odpowiedź. powodem jest to, że pozostajesz w kontekście iskry przez cały proces, a następnie zbierasz na końcu, w przeciwieństwie do wcześniejszego wyjścia z kontekstu iskry, co może spowodować większą kolekcję w zależności od tego, co robisz.
AntiPawn79
16

Na moich danych mam te testy porównawcze:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52 sek

>>> [row[col] for row in data.collect()]

O 0,271 sek

>>> list(data.select(col).toPandas()[col])

O 0,427 sek

Wynik jest taki sam

świetlistych ludzi
źródło
2
Jeśli toLocalIteratorzamiast tego użyjesz collect, powinno być jeszcze wydajniejsze pamięć[row[col] for row in data.toLocalIterator()]
oglop
Dzięki za cynk! @o
Andre Carneiro
Dzięki za test porównawczy, @luminousmen
Andre Carneiro
6

Jeśli pojawi się poniższy błąd:

AttributeError: obiekt „list” nie ma atrybutu „collect”

Ten kod rozwiąże Twoje problemy:

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]
anirban sen
źródło
Otrzymałem również ten błąd i to rozwiązanie rozwiązało problem. Ale dlaczego otrzymałem błąd? (Wielu innym wydaje się tego nie
rozumieć
3

Przeprowadziłem analizę porównawczą i list(mvv_count_df.select('mvv').toPandas()['mvv'])jest to najszybsza metoda. Jestem bardzo zdziwiony.

Przeprowadziłem różne podejścia na zestawach danych 100 tysięcy / 100 milionów wierszy, używając 5-węzłowego klastra i3.xlarge (każdy węzeł ma 30,5 GB pamięci RAM i 4 rdzenie) ze Spark 2.4.5. Dane zostały równomiernie rozłożone w 20 skompresowanych plikach Parquet z jedną kolumną.

Oto wyniki testów porównawczych (czasy wykonywania w sekundach):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

Złote zasady, których należy przestrzegać podczas gromadzenia danych w węźle kierowcy:

  • Spróbuj rozwiązać problem innymi podejściami. Zbieranie danych do węzła sterownika jest kosztowne, nie wykorzystuje mocy klastra Spark i należy go unikać, gdy tylko jest to możliwe.
  • Zbierz jak najmniej rzędów. Agreguj, usuwaj duplikaty, filtruj i usuwaj kolumny przed zebraniem danych. Wysyłaj jak najmniej danych do węzła sterownika.

toPandas został znacznie ulepszony w Spark 2.3 . Prawdopodobnie nie jest to najlepsze podejście, jeśli używasz wersji Spark starszej niż 2.3.

Zobacz tutaj, aby uzyskać więcej informacji / wyników testów porównawczych.

Uprawnienie
źródło
2

Możliwym rozwiązaniem jest użycie collect_list()funkcji from pyspark.sql.functions. Spowoduje to zagregowanie wszystkich wartości kolumn w tablicę pyspark, która po zebraniu zostanie przekonwertowana na listę Pythona:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 
phgui
źródło