Jak mogę zmienić typy kolumn w DataFrame platformy Spark SQL?

152

Załóżmy, że robię coś takiego:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

Ale naprawdę chciałem yearas Int(i być może przekształcić kilka innych kolumn).

Najlepsze, co mogłem wymyślić, to

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

co jest nieco zawiłe.

Pochodzę z R i jestem przyzwyczajony do pisania np

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

Prawdopodobnie czegoś mi brakuje, ponieważ powinien być lepszy sposób na zrobienie tego w Spark / Scala ...

kevinykuo
źródło
Podoba mi się w ten sposób spark.sql („SELECT STRING (NULLIF (kolumna,„ ”)) jako column_string”)
Eric Bellet

Odpowiedzi:

141

Edycja: najnowsza wersja

Od Spark 2.x możesz używać .withColumn. Sprawdź dokumenty tutaj:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame

Najstarsza odpowiedź

Od wersji Spark 1.4 możesz zastosować metodę rzutowania z DataType w kolumnie:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

Jeśli używasz wyrażeń sql, możesz również wykonać:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

Więcej informacji można znaleźć w dokumentacji: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

msemelman
źródło
4
dlaczego użyłeś withColumn, a następnie drop? Czy nie jest łatwiej po prostu użyć withColumn z oryginalną nazwą kolumny?
Ameba Spugnosa
@AmebaSpugnosa Myślę, że zanim go użyłem, Spark się zawiesił, jeśli miał powtarzające się nazwy kolumn. Nie kiedy je tworzysz, ale kiedy ich używasz.
msemelman
5
nie ma potrzeby usuwania kolumny, po której następuje zmiana nazwy. Możesz to zrobić w jednej liniidf.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))
ruhong
1
Czy w tym przypadku utworzono całą nową kopię ramki danych tylko po to, aby ponownie przekształcić kolumnę? Czy coś mi brakuje? A może za kulisami jest jakaś optymalizacja?
user1814008
5
Idąc przez docs z Spark 2.x, df.withColumn(..)można dodać lub wymienić kolumny w zależności od colNameargumentu
Y2K-Shubham
89

[EDYCJA: marzec 2016 r .: dzięki za głosy! Choć tak naprawdę, to nie jest najlepsza odpowiedź, myślę, że rozwiązania oparte na withColumn, withColumnRenameda castpodniesione przez msemelman Martin Senne i inni są prostsze i czystsze].

Myślę, że twoje podejście jest w porządku, pamiętaj, że Spark DataFramejest (niezmiennym) RDD wierszy, więc tak naprawdę nigdy nie zastępujemy kolumny, po prostu tworzymy nowy za DataFramekażdym razem z nowym schematem.

Zakładając, że masz oryginalny plik df z następującym schematem:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

I niektóre UDF zdefiniowane w jednej lub kilku kolumnach:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Zmiana typów kolumn, a nawet tworzenie nowej ramki DataFrame z innej, można zapisać w następujący sposób:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

co daje:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

Jest to bardzo zbliżone do twojego własnego rozwiązania. Po prostu zachowanie zmian typu i innych przekształceń jako oddzielnych udf valsprawia, że ​​kod jest bardziej czytelny i można go ponownie wykorzystać.

Svend
źródło
26
Nie jest to ani bezpieczne, ani wydajne. Nie jest to bezpieczne, ponieważ pojedynczy NULLlub źle sformułowany wpis spowoduje awarię całej pracy. Nie wydajne UDF, ponieważ nie są przezroczyste dla Catalyst. Używanie funkcji UDF do złożonych operacji jest w porządku, ale nie ma powodu, aby używać ich do rzutowania typu podstawowego. Dlatego mamy castmetodę (zobacz odpowiedź Martina Senne ). Zapewnienie przejrzystości Catalyst wymaga więcej pracy, ale podstawowe bezpieczeństwo to tylko kwestia wprowadzenia Tryi Optionwykonania.
zero323
Nie widziałem nic związanego z dotychczasową konwersją stringów, na przykład „05-APR-2015”
dbspace
3
Czy istnieje sposób na zredukowanie withColumn()sekcji do ogólnej, która iteruje przez wszystkie kolumny?
Boern
Dzięki zero323, po przeczytaniu tego doszedłem do wniosku, dlaczego tutaj rozwiązanie udf ulega awarii. Niektóre komentarze są lepsze niż niektóre odpowiedzi na SO :)
Simon Dirmeier
Czy istnieje sposób, w jaki możemy poznać uszkodzony wiersz, czyli rekordy, które podczas rzutowania mają kolumny o niewłaściwych typach danych. Funkcja rzutowania powoduje, że te pola są puste
Etisha
65

Ponieważ castoperacja jest dostępna dla Sparka Column(i osobiście nie popieram udfproponowanej przez @ Svendw tym momencie), co powiesz na:

df.select( df("year").cast(IntegerType).as("year"), ... )

rzutować na żądany typ? Jako fajny efekt uboczny, wartości, których nie można rzutować / „konwertować” w tym sensie, staną się null.

Jeśli potrzebujesz tego jako metody pomocniczej , użyj:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

który jest używany jak:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Martin Senne
źródło
2
Czy możesz mi doradzić, jak postępować, jeśli muszę rzucić i zmienić nazwę całej grupy kolumn (mam 50 kolumn i całkiem nowy w scala, nie jestem pewien, jak najlepiej podejść do tego bez tworzenia masowej duplikacji)? Niektóre kolumny powinny pozostać Stringami, inne powinny być rzutowane na Float.
Dmitry Smirnov
jak przekonwertować ciąg na datę, np. „25-KWI-2016” w kolumnie i „20160302”
dbspace
@DmitrySmirnov Czy kiedykolwiek otrzymałeś odpowiedź? Mam to samo pytanie. ;)
Evan Zamir
@EvanZamir niestety nie, skończyło się na wykonaniu gównianej operacji, aby móc używać danych jako rdd w innych krokach. Zastanawiam się, czy w dzisiejszych czasach stało się to łatwiejsze :)
Dmitry Smirnov
60

Po pierwsze , jeśli chcesz przesyłać typ, to:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

Kolumna o tej samej nazwie zostanie zastąpiona nową. Nie musisz dodawać ani usuwać kroków.

Po drugie , o Scala vs R .
Oto kod, który najbardziej przypomina RI:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Chociaż długość kodu jest nieco dłuższa niż R. Nie ma to nic wspólnego z gadatliwością języka. W R mutatejest to specjalna funkcja dla R dataframe, natomiast w Scali można ją łatwo ad-hoc dzięki jej ekspresyjnej sile.
Słowem, unika konkretnych rozwiązań, ponieważ projekt języka jest wystarczająco dobry, abyś mógł szybko i łatwo zbudować własny język domeny.


uwaga boczna: df.columnsjest zaskakująco a Array[String]zamiast Array[Column], może chcą, aby wyglądało jak ramka danych pandy w Pythonie.

WeiChing 林 煒 清
źródło
1
Czy mógłbyś podać ekwiwalent za pyspark?
Harit Vishwakarma
Otrzymuję „niedozwolony początek definicji” .withColumn („wiek”, $ „wiek” .cast (sql.types.DoubleType)) dla mojego pola „wiek”. Jakieś sugestie?
BlueDolphin
Czy musisz buforować () ramkę danych, jeśli wykonujemy te konwersje w wielu kolumnach ze względu na wydajność, czy też nie jest to wymagane, ponieważ Spark je optymalizuje?
skjagini
Import może być, import org.apache.spark.sql.types._a następnie zamiast po sql.types.IntegerTypeprostu IntegerType.
nessa.gp
17

Możesz użyć, selectExpraby uczynić go trochę czystszym:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
dnlbrky
źródło
14

Kod Java służący do modyfikowania typu danych DataFrame z String na Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

Po prostu przerzuci istniejący (typ danych String) na liczbę całkowitą.

manishbelsare
źródło
1
Nie ma DataTypesw sql.types! to jest DataType. Co więcej, można po prostu importować IntegerTypei przesyłać.
Ehsan M. Kermani
@ EhsanM.Kermani faktycznie DatyaTypes.IntegerType to legalne odniesienie.
Cupitor
1
@Cupitor DataTypes.IntegerTypebył w trybie DeveloperAPI i jest stabilny w wersji 2.1.0
Ehsan M. Kermani,
To najlepsze rozwiązanie!
Simon Dirmeier
8

Aby przekonwertować rok ze string na int, możesz dodać następującą opcję do czytnika csv: "inferSchema" -> "true", zobacz dokumentację DataBricks

Peter Rose
źródło
5
Działa to ładnie, ale haczyk polega na tym, że czytelnik musi wykonać drugie przejście pliku
beefyhalo
@beefyhalo absolutnie na miejscu, czy jest jakiś sposób na obejście tego?
Ayush
6

Więc to naprawdę działa tylko wtedy, gdy masz problemy z zapisywaniem do sterownika jdbc, takiego jak sqlserver, ale jest naprawdę pomocne w przypadku błędów, które napotkasz ze składnią i typami.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
ben jarman
źródło
Czy możesz mi pomóc zaimplementować ten sam kod w Javie? i jak zarejestrować customJdbcDialect w DataFrame
abhijitcaps
Fajnie zrobiłem to samo z Verticą, ale od Spark 2.1. JDbcUtil musisz zaimplementować tylko określony typ danych, którego potrzebujesz. dialect.getJDBCType (dt) .orElse (getCommonJDBCType (dt)). getOrElse (rzut nowy IllegalArgumentException (s "Nie można pobrać typu JDBC dla $ {dt.simpleString}"))
Arnon Rodman
6

Wygeneruj prosty zbiór danych zawierający pięć wartości i przekonwertuj intna stringtyp:

val df = spark.range(5).select( col("id").cast("string") )
user8106134
źródło
6

Myślę, że jest to dla mnie dużo bardziej czytelne.

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

Spowoduje to konwersję kolumny roku na IntegerTypez utworzeniem jakichkolwiek kolumn tymczasowych i upuszczeniem tych kolumn. Jeśli chcesz przekonwertować na inny typ danych, możesz sprawdzić typy w org.apache.spark.sql.typespakiecie.

Piyush Patel
źródło
5

odpowiedzi sugerujące użycie cast, FYI, metoda cast w Spark 1.4.1 jest zepsuta.

na przykład ramka danych z kolumną łańcuchową o wartości „8182175552014127960” po rzutowaniu na bigint ma wartość „8182175552014128100”

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

Musieliśmy zmierzyć się z wieloma problemami, zanim znaleźliśmy ten błąd, ponieważ mieliśmy kolumny bigint w produkcji.

sauraI3h
źródło
4
psst, zaktualizuj swoją iskrę
msemelman
2
@msemelman to śmieszne, aby zaktualizować Spark do nowej wersji produkcyjnej z powodu małego błędu.
sauraI3h
czy nie zawsze aktualizujemy wszystko pod kątem drobnych błędów? :)
caesarsol
5
df.select($"long_col".cast(IntegerType).as("int_col"))
soulmachine
źródło
4

Używając Spark Sql 2.4.0 możesz to zrobić:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Eric Bellet
źródło
3

Możesz użyć poniższego kodu.

df.withColumn("year", df("year").cast(IntegerType))

Który przekształci kolumnę roku w IntegerTypekolumnę.

adarsh
źródło
2

Ta metoda spowoduje usunięcie starej kolumny i utworzenie nowych kolumn z tymi samymi wartościami i nowym typem danych. Moje oryginalne typy danych podczas tworzenia DataFrame to: -

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

Po tym uruchomiłem następujący kod, aby zmienić typ danych: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

Po tym mój wynik wyszedł: -

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
PirateJack
źródło
Czy mógłbyś podać tutaj swoje rozwiązanie.
Ajay Kharade
1

Można zmienić typ danych kolumny za pomocą rzutowania w Spark sql. nazwa tabeli to tabela i ma dwie kolumny, tylko typ danych kolumna1 i kolumna2, a typ danych kolumna1 ma zostać zmieniony. ex-spark.sql ("select cast (column1 as Double) column1NewName, column2 from table") W miejsce double zapisz swój typ danych.

Tejasvi Sharma
źródło
1

W przypadku, gdy musisz zmienić nazwę dziesiątek kolumn podanych przez ich nazwę, poniższy przykład przyjmuje podejście @dnlbrky i stosuje je do kilku kolumn jednocześnie:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

Nieobrzucone kolumny pozostają niezmienione. Wszystkie kolumny pozostają w pierwotnej kolejności.

sałata sześcienna
źródło
1

Tyle odpowiedzi i niewiele dokładnych wyjaśnień

Następująca składnia działa przy użyciu Notatnika Databricks ze Spark 2.4

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

Zauważ, że musisz określić format wpisu jaki posiadasz (w moim przypadku "MM-dd-rrrr"), a import jest obowiązkowy, ponieważ to_date jest funkcją iskrową sql

Wypróbowałem również tę składnię, ale otrzymałem wartości null zamiast prawidłowego rzutowania:

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(Uwaga, musiałem użyć nawiasów i cudzysłowów, aby było to poprawne składniowo)


PS: Muszę przyznać, że to jest jak dżungla składni, istnieje wiele możliwych sposobów wejścia, a oficjalne odniesienia do API nie mają odpowiednich przykładów.

Mehdi LAMRANI
źródło
1
Jungle składni. Tak. To jest teraz świat Spark.
conner.xyz
1

Inne rozwiązanie jest następujące:

1) Zachowaj „inferSchema” jako False

2) Podczas uruchamiania funkcji „Map” w wierszu można odczytać „asString” (row.getString ...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });
Vibha
źródło
0
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
Aravind Krishnakumar
źródło
0

Inny sposób:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
user8106134
źródło
0

W przypadku, gdy chcesz zmienić wiele kolumn określonego typu na inne bez określania nazw poszczególnych kolumn

/* Get names of all columns that you want to change type. 
In this example I want to change all columns of type Array to String*/
    val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}

//display

updatedDataFrame.show(truncate = false)
Ravi
źródło