Spark Dataframe rozróżnia kolumny ze zduplikowaną nazwą

83

Tak więc, jak wiem, w Spark Dataframe, dla wielu kolumn może mieć taką samą nazwę, jak pokazano poniżej migawka dataframe:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Powyższy wynik jest tworzony przez połączenie z ramką danych do samego siebie, możesz zobaczyć, że istnieją 4kolumny z dwoma ai f.

Problem polega na tym, że gdy próbuję wykonać więcej obliczeń z akolumną, nie mogę znaleźć sposobu, aby wybrać a, próbowałem df[0]i df.select('a')oba zwróciły mi poniżej komunikat o błędzie:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Czy mimo to w interfejsie Spark API mogę ponownie odróżnić kolumny od zduplikowanych nazw? a może jakiś sposób, aby zmienić nazwy kolumn?

resec
źródło

Odpowiedzi:

61

Zalecałbym zmianę nazw kolumn w pliku join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

Wynikowy DataFramebędzie miałschema

(df1_a, df1_f, df2_a, df2_f)
Glennie Helles Sindholt
źródło
5
Może być konieczne poprawienie odpowiedzi, ponieważ cudzysłowy nie są odpowiednio dopasowane między nazwami kolumn.
Sameh Sharaf,
2
@SamehSharaf Zakładam, że to ty głosujesz na moją odpowiedź? Ale odpowiedź jest w rzeczywistości w 100% poprawna - po prostu używam 'skrótu scala do wyboru kolumn, więc w rzeczywistości nie ma problemu z cudzysłowami.
Glennie Helles Sindholt
31
@GlennieHellesSindholt, słuszna uwaga. Jest to mylące, ponieważ odpowiedź jest oznaczona jako pythoni pyspark.
Jorge Leitao
Co się stanie, jeśli każda ramka danych zawiera ponad 100 kolumn i wystarczy zmienić nazwę jednej kolumny, która jest taka sama? Z pewnością nie można ręcznie wpisać wszystkich tych nazw kolumn w klauzuli select
bikashg
6
W takim przypadku można przejść zdf1.withColumnRenamed("a", "df1_a")
Glennie Helles Sindholt
100

Zacznijmy od pewnych danych:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

Istnieje kilka sposobów rozwiązania tego problemu. Przede wszystkim możesz jednoznacznie odwołać się do podrzędnych kolumn tabeli za pomocą kolumn nadrzędnych:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Możesz także użyć aliasów tabel:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Wreszcie możesz programowo zmienić nazwy kolumn:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
zero323
źródło
7
Dziękuję za edycję za pokazanie tak wielu sposobów uzyskania prawidłowej kolumny w tych niejednoznacznych przypadkach, myślę, że twoje przykłady powinny znaleźć się w przewodniku programowania Spark. Wiele się nauczyłem!
resec
mała korekta: df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns))zamiast df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns)). Co do reszty, dobra rzecz
Vzzarr
Zgadzam się, że powinno to być częścią przewodnika programowania Spark. Czyste złoto. W końcu udało mi się rozwiązać źródło niejednoznaczności wybierając kolumny według starych nazw przed wykonaniem złączenia. Rozwiązanie polegające na programowym dołączaniu sufiksów do nazw kolumn przed wykonaniem złączenia wszystkich niejednoznaczności wnet.
Pablo Adames
26

Istnieje prostszy sposób niż pisanie aliasów dla wszystkich kolumn, do których się przyłączasz, wykonując:

df1.join(df2,['a'])

Działa to, jeśli klucz, do którego się przyłączasz, jest taki sam w obu tabelach.

Zobacz https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

Paul Bendevis
źródło
4
to jest właściwa odpowiedź od Spark 2+
Matt
2
A dla Scala: df1.join (df2, Seq ("a"))
mauriciojost
1
strona została przeniesiona na: kb.databricks.com/data/…
bogdan.rusu
7

Możesz użyć def drop(col: Column)metody, aby usunąć zduplikowaną kolumnę, na przykład:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

kiedy dołączę df1 do df2, DataFrame będzie wyglądać jak poniżej:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Teraz możemy użyć def drop(col: Column)metody, aby usunąć zduplikowaną kolumnę „a” lub „f”, tak jak poniżej:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
StrongYoung
źródło
Czy to podejście zadziała, jeśli wykonujesz sprzężenie zewnętrzne, a dwie kolumny mają różne wartości?
prafi
Możesz nie chcieć porzucić, jeśli różne relacje mają ten sam schemat.
thebluephantom
5

Po zagłębieniu się w Spark API odkryłem, że mogę najpierw użyć aliasdo utworzenia aliasu dla oryginalnej ramki danych, a następnie używam withColumnRenameddo ręcznej zmiany nazwy każdej kolumny na aliasie, zrobi to joinbez powodowania duplikacji nazwy kolumny.

Więcej szczegółów można znaleźć poniżej Spark Dataframe API :

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

Uważam jednak, że jest to tylko kłopotliwe obejście i zastanawiam się, czy istnieje lepszy sposób na moje pytanie.

resec
źródło
4

Załóżmy, że ramki danych, które chcesz dołączyć, to df1 i df2, a łączysz je w kolumnie „a”, a następnie masz 2 metody

Metoda 1

df1.join (df2, 'a', 'left_outer')

To świetna metoda i jest wysoce zalecana.

Metoda 2

df1.join (df2, df1.a == df2.a, 'left_outer'). drop (df2.a)

typhoonbxq
źródło
4

W ten sposób możemy połączyć dwie ramki Dataframe na tych samych nazwach kolumn w PySpark.

df = df1.join(df2, ['col1','col2','col3'])

Jeśli to zrobisz printSchema(), zobaczysz, że zduplikowane kolumny zostały usunięte.

Nikhil Redij
źródło
1

To może nie być najlepsze podejście, ale jeśli chcesz zmienić nazwę zduplikowanych kolumn (po połączeniu), możesz to zrobić za pomocą tej niewielkiej funkcji.

def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
Akash
źródło
1

jeśli tylko kolumna klucza jest taka sama w obu tabelach, spróbuj zastosować następującą metodę (podejście 1):

left. join(right , 'key', 'inner')

zamiast poniżej (podejście 2):

left. join(right , left.key == right.key, 'inner')

Zalety stosowania podejścia 1:

  • „klucz” pojawi się tylko raz w końcowej ramce danych
  • łatwa w użyciu składnia

Wady stosowania podejścia 1:

  • pomoc tylko z kolumną kluczową
  • Scenariusze, w których przypadek lewostronnego złączenia, jeśli planujesz użyć prawego klawisza zliczającego wartość null, nie zadziała. W takim przypadku należy zmienić nazwę jednego z kluczy, jak wspomniano powyżej.
Manish Singla
źródło
0

Jeśli masz bardziej skomplikowany przypadek użycia niż opisany w odpowiedzi Glennie Helles Sindholt, np. Masz inne / kilka niełączonych nazw kolumn, które są również takie same i chcesz je rozróżnić przy wyborze, najlepiej użyć aliasów, np .:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']
Wassermann
źródło