Nie możesz dodać dowolnej kolumny do DataFrame
w Spark. Nowe kolumny można tworzyć tylko przy użyciu literałów (inne typy literałów są opisane w Jak dodać stałą kolumnę w Spark DataFrame? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
przekształcenie istniejącej kolumny:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
zawarte za pomocą join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
lub wygenerowane za pomocą funkcji / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Pod względem wydajności wbudowane funkcje ( pyspark.sql.functions
), które są mapowane na wyrażenie Catalyst, są zwykle preferowane w stosunku do funkcji zdefiniowanych przez użytkownika w Pythonie.
Jeśli chcesz dodać zawartość dowolnego RDD jako kolumnę, możesz
Aby dodać kolumnę za pomocą UDF:
źródło
Dla Spark 2.0
źródło
df = df.select('*', (df.age + 10).alias('agePlusTen'))
, skutecznie dodasz dowolną kolumnę, ponieważ @ zero323 ostrzegł nas powyżej, jest niemożliwe, chyba że coś jest nie tak w Sparku, w Pandach jest to standardowy sposób ..df.select('*', df.age + 10, df.age + 20)
Istnieje wiele sposobów dodania nowej kolumny w pySpark.
Najpierw utwórzmy prostą ramkę DataFrame.
Teraz spróbujmy podwoić wartość kolumny i zapisać ją w nowej kolumnie. PFB kilka różnych podejść do osiągnięcia tego samego.
Więcej przykładów i wyjaśnień na temat funkcji Spark DataFrame można znaleźć na moim blogu .
Mam nadzieję, że to pomoże.
źródło
Możesz zdefiniować nowy
udf
podczas dodawaniacolumn_name
:źródło
źródło
StringType()
.Chciałbym podać ogólny przykład dla bardzo podobnego przypadku użycia:
Przykład zastosowania: mam plik CSV składający się z:
Muszę wykonać jakieś transformacje i końcowy plik csv musi wyglądać
Muszę to zrobić, ponieważ jest to schemat zdefiniowany przez jakiś model i potrzebuję, aby moje końcowe dane były interoperacyjne z SQL Bulk Inserts i tym podobne.
więc:
1) Przeczytałem oryginalny plik csv za pomocą spark.read i nazywam go „df”.
2) Robię coś z danymi.
3) Dodaj puste kolumny za pomocą tego skryptu:
W ten sposób możesz ustrukturyzować swój schemat po załadowaniu pliku csv (będzie to również działać w przypadku zmiany kolejności kolumn, jeśli musisz to zrobić dla wielu tabel).
źródło
Najprostszym sposobem dodania kolumny jest użycie opcji „withColumn”. Ponieważ ramka danych jest tworzona za pomocą sqlContext, musisz określić schemat lub domyślnie może być dostępna w zestawie danych. Jeśli schemat jest określony, obciążenie staje się uciążliwe przy każdej zmianie.
Poniżej znajduje się przykład, który możesz wziąć pod uwagę:
źródło
Możemy dodać dodatkowe kolumny bezpośrednio do DataFrame, wykonując poniższe czynności:
źródło