Scalanie wielu ramek danych w wierszach w PySpark

21

Mam 10 ramek danych pyspark.sql.dataframe.DataFrame, uzyskanych od randomSplitjak (td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100)Teraz chcę dołączyć 9 td„s na pojedynczej ramce danych, jak mam to zrobić?

Próbowałem już z unionAll, ale ta funkcja akceptuje tylko dwa argumenty.

td1_2 = td1.unionAll(td2) 
# this is working fine

td1_2_3 = td1.unionAll(td2, td3) 
# error TypeError: unionAll() takes exactly 2 arguments (3 given)

Czy istnieje sposób na połączenie więcej niż dwóch ramek danych w rzędzie?

Celem tego jest to, że robię 10-krotną weryfikację krzyżową ręcznie bez użycia CrossValidatormetody PySpark , więc biorąc 9 na trening i 1 na dane testowe, a następnie powtórzę to dla innych kombinacji.

krishna Prasad
źródło
1
To nie odpowiada bezpośrednio na pytanie, ale tutaj sugeruję ulepszenie metody nazewnictwa, aby w końcu nie musieć pisać, na przykład: [td1, td2, td3, td4, td5, td6, td7 , td8, td9, td10]. Wyobraź sobie, że robisz to dla 100-krotnego CV. Oto, co zrobię: porcje = [0,1] * 10 cv = df7.randomSplit (porcje) folds = lista (zakres (10)) dla i w zakresie (10): test_data = cv [i] fold_no_i = folds [: i] + folds [i + 1:] train_data = cv [fold_no_i [0]] for j in fold_no_i [1:]: train_data = train_data.union (cv [j])
ngoc thoag

Odpowiedzi:

37

Skradzione z: /programming/33743978/spark-union-of-multiple-rdds

Poza łańcuchami związków jest to jedyny sposób, aby to zrobić dla DataFrames.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)

Dzieje się tak, ponieważ bierze wszystkie przekazane obiekty jako parametry i redukuje je za pomocą unionAll (ta redukcja pochodzi z Pythona, a nie redukcji Spark, chociaż działają podobnie), co ostatecznie redukuje ją do jednej ramki danych.

Jeśli zamiast DataFrames są one normalnymi RDD, możesz przekazać ich listę do funkcji unii SparkContext

EDYCJA: Dla twojego celu proponuję inną metodę, ponieważ musiałbyś powtórzyć ten cały związek 10 razy dla różnych fałd dla krzyżowej walidacji, dodałbym etykiety, do których należy fałd, i po prostu filtruję twoją ramkę danych dla każdego fałdu na podstawie etykieta

Jan van der Vegt
źródło
(+1) Fajne obejście. Jednak musi istnieć funkcja, która pozwala na łączenie wielu ramek danych. Byłoby całkiem przydatne!
Dawny33
Nie zgadzam się z tym
Jan van der Vegt
@JanvanderVegt Dzięki, działa, a pomysł dodawania etykiet w celu odfiltrowania zestawu danych szkoleniowych i testowych, już to zrobiłem. Bardzo ci dziękuje za pomoc.
krishna Prasad
@Jan van der Vegt Czy możesz zastosować tę samą logikę dla Dołącz i odpowiedzieć na to pytanie
GeorgeOfTheRF
6

Czasami, gdy łączone ramki danych nie mają tej samej kolejności kolumn, lepiej jest df2.select (df1.columns), aby upewnić się, że oba df mają tę samą kolejność kolumn przed zjednoczeniem.

import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

Przykład:

df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order. 
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a']) 
df3 = spark.createDataFrame([555,5],[666,6]],['b','a']) 

unioned_df = unionAll([df1, df2, df3])
unioned_df.show() 

wprowadź opis zdjęcia tutaj

w przeciwnym razie wygeneruje poniższy wynik.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

unionAll(*[df1, df2, df3]).show()

wprowadź opis zdjęcia tutaj

Wong Tat Yau
źródło
2

Co powiesz na użycie rekurencji?

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])
proinsias
źródło