Zmienianie nazw kolumn DataFrame w Spark Scala

93

Próbuję przekonwertować wszystkie nagłówki / nazwy kolumn DataFramew Spark-Scala. na razie wymyślam następujący kod, który zastępuje tylko jedną nazwę kolumny.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Sam
źródło

Odpowiedzi:

239

Jeśli konstrukcja jest płaska:

val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema
// root
//  |-- _1: long (nullable = false)
//  |-- _2: string (nullable = true)
//  |-- _3: string (nullable = true)
//  |-- _4: double (nullable = false)

najprostszą rzeczą jaką możesz zrobić jest użycie toDFmetody:

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema
// root
// |-- id: long (nullable = false)
// |-- x1: string (nullable = true)
// |-- x2: string (nullable = true)
// |-- x3: double (nullable = false)

Jeśli chcesz zmienić nazwę poszczególne kolumny można skorzystać z jednego selectz alias:

df.select($"_1".alias("x1"))

które można łatwo uogólnić na wiele kolumn:

val lookup = Map("_1" -> "foo", "_3" -> "bar")

df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

lub withColumnRenamed:

df.withColumnRenamed("_1", "x1")

które używają foldLeftdo zmiany nazwy wielu kolumn:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

W przypadku struktur zagnieżdżonych ( structs) jedną z możliwych opcji jest zmiana nazwy poprzez wybranie całej struktury:

val nested = spark.read.json(sc.parallelize(Seq(
    """{"foobar": {"foo": {"bar": {"first": 1.0, "second": 2.0}}}, "id": 1}"""
)))

nested.printSchema
// root
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: struct (nullable = true)
//  |    |    |-- bar: struct (nullable = true)
//  |    |    |    |-- first: double (nullable = true)
//  |    |    |    |-- second: double (nullable = true)
//  |-- id: long (nullable = true)

@transient val foobarRenamed = struct(
  struct(
    struct(
      $"foobar.foo.bar.first".as("x"), $"foobar.foo.bar.first".as("y")
    ).alias("point")
  ).alias("location")
).alias("record")

nested.select(foobarRenamed, $"id").printSchema
// root
//  |-- record: struct (nullable = false)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- point: struct (nullable = false)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
//  |-- id: long (nullable = true)

Pamiętaj, że może to wpłynąć na nullabilitymetadane. Inną możliwością jest zmiana nazwy poprzez rzutowanie:

nested.select($"foobar".cast(
  "struct<location:struct<point:struct<x:double,y:double>>>"
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)

lub:

import org.apache.spark.sql.types._

nested.select($"foobar".cast(
  StructType(Seq(
    StructField("location", StructType(Seq(
      StructField("point", StructType(Seq(
        StructField("x", DoubleType), StructField("y", DoubleType)))))))))
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
zero323
źródło
Cześć @ zero323 Podczas korzystania z withColumnRenamed otrzymuję wyjątek AnalysisException nie może rozwiązać 'CC8. 1 'podane kolumny wejściowe ... Nie udaje się, mimo że CC8.1 jest dostępny w DataFrame.
unk1102
@ u449355 Nie jest dla mnie jasne, czy jest to kolumna zagnieżdżona, czy zawierająca kropki. W późniejszym przypadku powinny działać backtiki (przynajmniej w niektórych podstawowych przypadkach).
zero323
1
co to : _*)znaczy wdf.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
Anton Kim
1
Odpowiadając na pytanie Antona Kima: : _*jest to tak zwany operator scali "ikona". Zasadniczo rozbija obiekt podobny do tablicy na niezabezpieczoną listę, co jest przydatne, gdy chcesz przekazać tablicę do funkcji, która przyjmuje dowolną liczbę argumentów, ale nie ma wersji, która przyjmuje rozszerzenie List[]. Jeśli w ogóle znasz Perla, to jest różnica między some_function(@my_array) # "splatted"i some_function(\@my_array) # not splatted ... in perl the backslash "\" operator returns a reference to a thing.
Mylo Stone
1
To stwierdzenie jest dla mnie naprawdę niejasne df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)... Czy mógłbyś je rozłożyć? zwłaszcza lookup.getOrElse(c,c)część.
Aetos
19

Dla tych z Was, którzy są zainteresowani wersją PySpark (właściwie tak samo jest w Scali - patrz komentarz poniżej):

    merchants_df_renamed = merchants_df.toDF(
        'merchant_id', 'category', 'subcategory', 'merchant')

    merchants_df_renamed.printSchema()

Wynik:

root
| - merchant_id: integer (nullable = true)
| - category: string (nullable = true)
| - subcategory: string (nullable = true)
| - merchant: string (nullable = true)

Tagar
źródło
1
Używając toDF()do zmiany nazw kolumn w DataFrame, należy zachować ostrożność. Ta metoda działa znacznie wolniej niż inne. Mam DataFrame zawiera 100M rekordów, a proste zapytanie liczące zajmuje ~ 3s, podczas gdy to samo zapytanie z toDF()metodą zajmuje ~ 16s. Ale kiedy używam select col AS col_newmetody zmiany nazwy, ponownie otrzymuję ~ 3s. Ponad 5 razy szybciej! Spark 2.3.2.3
Ihor Konovalenko
6
def aliasAllColumns(t: DataFrame, p: String = "", s: String = ""): DataFrame =
{
  t.select( t.columns.map { c => t.col(c).as( p + c + s) } : _* )
}

W przypadku, gdy nie jest to oczywiste, dodaje to przedrostek i sufiks do każdej z bieżących nazw kolumn. Może to być przydatne, gdy masz dwie tabele z jedną lub więcej kolumnami o tej samej nazwie i chcesz do nich dołączyć, ale nadal możesz rozróżnić kolumny w tabeli wynikowej. Z pewnością byłoby miło, gdyby istniał podobny sposób zrobienia tego w „normalnym” języku SQL.

Kamień Mylo
źródło
na pewno lubię, miło i elegancko
thebluephantom
1

Załóżmy, że df ramki danych ma 3 kolumny id1, nazwa1, cena1 i chcesz zmienić ich nazwy na id2, nazwa2, cena2

val list = List("id2", "name2", "price2")
import spark.implicits._
val df2 = df.toDF(list:_*)
df2.columns.foreach(println)

To podejście okazało się przydatne w wielu przypadkach.

Jagadeesh Verri
źródło
0

sprzężenie tabeli holowania nie zmienia nazwy połączonego klucza

// method 1: create a new DF
day1 = day1.toDF(day1.columns.map(x => if (x.equals(key)) x else s"${x}_d1"): _*)

// method 2: use withColumnRenamed
for ((x, y) <- day1.columns.filter(!_.equals(key)).map(x => (x, s"${x}_d1"))) {
    day1 = day1.withColumnRenamed(x, y)
}

Pracuje!

Colin Wang
źródło
0
Sometime we have the column name is below format in SQLServer or MySQL table

Ex  : Account Number,customer number

But Hive tables do not support column name containing spaces, so please use below solution to rename your old column names.

Solution:

val renamedColumns = df.columns.map(c => df(c).as(c.replaceAll(" ", "_").toLowerCase()))
df = df.select(renamedColumns: _*)
R. RamkumarYugo
źródło