Pracuję na ramce danych z dwiema kolumnami, mvv i count.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
Chciałbym uzyskać dwie listy zawierające wartości MVV i wartość licznika. Coś jak
mvv = [1,2,3,4]
count = [5,9,3,1]
Więc wypróbowałem następujący kod: Pierwsza linia powinna zwracać listę wierszy w języku Python. Chciałem zobaczyć pierwszą wartość:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Ale pojawia się komunikat o błędzie z drugą linią:
AttributeError: getInt
python
apache-spark
pyspark
spark-dataframe
a.moussa
źródło
źródło
list(df.select('mvv').toPandas()['mvv'])
. Arrow został zintegrowany z PySpark, cotoPandas
znacznie przyspieszyło . Nie używaj innych podejść, jeśli używasz platformy Spark 2.3+. Zobacz moją odpowiedź, aby uzyskać więcej szczegółów dotyczących testów porównawczych.Odpowiedzi:
Zobacz, dlaczego ten sposób, który robisz, nie działa. Po pierwsze, próbujesz uzyskać liczbę całkowitą z typu wiersza , wynik twojej kolekcji jest taki:
>>> mvv_list = mvv_count_df.select('mvv').collect() >>> mvv_list[0] Out: Row(mvv=1)
Jeśli weźmiesz coś takiego:
>>> firstvalue = mvv_list[0].mvv Out: 1
Otrzymasz
mvv
wartość. Jeśli chcesz uzyskać wszystkie informacje o tablicy, możesz wziąć coś takiego:>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()] >>> mvv_array Out: [1,2,3,4]
Ale jeśli spróbujesz tego samego dla drugiej kolumny, otrzymasz:
>>> mvv_count = [int(row.count) for row in mvv_list.collect()] Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'
Dzieje się tak, ponieważ
count
jest to metoda wbudowana. Kolumna ma taką samą nazwę jakcount
. Aby to zrobić, zmień nazwę kolumnycount
na_count
:>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count") >>> mvv_count = [int(row._count) for row in mvv_list.collect()]
Ale to obejście nie jest potrzebne, ponieważ możesz uzyskać dostęp do kolumny za pomocą składni słownika:
>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()] >>> mvv_count = [int(row['count']) for row in mvv_list.collect()]
I w końcu zadziała!
źródło
select('count')
zastosowania w ten sposób:count_list = [int(i.count) for i in mvv_list.collect()]
dodam przykład do odpowiedzi.[i.['count'] for i in mvv_list.collect()]
działa, aby jawnie używać kolumny o nazwie „count”, a niecount
funkcjiPodążanie za jedną linijką daje listę, którą chcesz.
mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
źródło
To da ci wszystkie elementy w postaci listy.
mvv_list = list( mvv_count_df.select('mvv').toPandas()['mvv'] )
źródło
Poniższy kod pomoże ci
mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
źródło
Na moich danych mam te testy porównawcze:
>>> data.select(col).rdd.flatMap(lambda x: x).collect()
0.52 sek
>>> [row[col] for row in data.collect()]
O 0,271 sek
>>> list(data.select(col).toPandas()[col])
O 0,427 sek
Wynik jest taki sam
źródło
toLocalIterator
zamiast tego użyjeszcollect
, powinno być jeszcze wydajniejsze pamięć[row[col] for row in data.toLocalIterator()]
Jeśli pojawi się poniższy błąd:
Ten kod rozwiąże Twoje problemy:
mvv_list = mvv_count_df.select('mvv').collect() mvv_array = [int(i.mvv) for i in mvv_list]
źródło
Przeprowadziłem analizę porównawczą i
list(mvv_count_df.select('mvv').toPandas()['mvv'])
jest to najszybsza metoda. Jestem bardzo zdziwiony.Przeprowadziłem różne podejścia na zestawach danych 100 tysięcy / 100 milionów wierszy, używając 5-węzłowego klastra i3.xlarge (każdy węzeł ma 30,5 GB pamięci RAM i 4 rdzenie) ze Spark 2.4.5. Dane zostały równomiernie rozłożone w 20 skompresowanych plikach Parquet z jedną kolumną.
Oto wyniki testów porównawczych (czasy wykonywania w sekundach):
+-------------------------------------------------------------+---------+-------------+ | Code | 100,000 | 100,000,000 | +-------------------------------------------------------------+---------+-------------+ | df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 | | list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 | | df.select('col_name').rdd.map(lambda row : row[0]).collect()| 0.9 | 69 | | [row[0] for row in df.select('col_name').collect()] | 1.0 | OOM | | [r[0] for r in mid_df.select('col_name').toLocalIterator()] | 1.2 | * | +-------------------------------------------------------------+---------+-------------+ * cancelled after 800 seconds
Złote zasady, których należy przestrzegać podczas gromadzenia danych w węźle kierowcy:
toPandas
został znacznie ulepszony w Spark 2.3 . Prawdopodobnie nie jest to najlepsze podejście, jeśli używasz wersji Spark starszej niż 2.3.Zobacz tutaj, aby uzyskać więcej informacji / wyników testów porównawczych.
źródło
Możliwym rozwiązaniem jest użycie
collect_list()
funkcji frompyspark.sql.functions
. Spowoduje to zagregowanie wszystkich wartości kolumn w tablicę pyspark, która po zebraniu zostanie przekonwertowana na listę Pythona:mvv_list = df.select(collect_list("mvv")).collect()[0][0] count_list = df.select(collect_list("count")).collect()[0][0]
źródło