Jak stworzyć pustą ramkę DataFrame z określonym schematem?

99

Chcę tworzyć dalej DataFramez określonym schematem w Scali. Próbowałem użyć odczytu JSON (mam na myśli czytanie pustego pliku), ale nie sądzę, że to najlepsza praktyka.

user1735076
źródło

Odpowiedzi:

130

Załóżmy, że potrzebujesz ramki danych o następującym schemacie:

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

Po prostu definiujesz schemat ramki danych i używasz pustego RDD[Row]:

import org.apache.spark.sql.types.{
    StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

Odpowiednik PySpark jest prawie identyczny:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

Używanie niejawnych koderów (tylko Scala) z Producttypami takimi jak Tuple:

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

lub klasa sprawy:

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

lub

spark.emptyDataset[KV].toDF
zero323
źródło
To jest najwłaściwsza odpowiedź - pełna, a także przydatna, jeśli chcesz szybko odtworzyć schemat istniejącego zbioru danych. Nie wiem, dlaczego nie jest to akceptowane.
Lucas Lima
Jak stworzyć df z cechą zamiast klasy przypadku: stackoverflow.com/questions/64276952/ ...
supernatural
41

Począwszy od Spark 2.0.0, możesz wykonać następujące czynności.

Klasa przypadku

Zdefiniujmy Personklasę przypadku:

scala> case class Person(id: Int, name: String)
defined class Person

Importuj sparkniejawną sesję SparkSession Encoders:

scala> import spark.implicits._
import spark.implicits._

I użyj SparkSession, aby utworzyć pusty Dataset[Person]:

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

Schemat DSL

Można również użyć schematu „DSL” (zobacz Funkcje obsługi ramek danych w org.apache.spark.sql.ColumnName ).

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
Jacek Laskowski
źródło
Cześć, kompilator powiedział, że spark.emptyDatasetnie ma tego w moim module, jak go używać? są jakieś (poprawne) podobne do (niepoprawne) val df = apache.spark.emptyDataset[RawData]?
Peter Krauss
@PeterKrauss sparkto wartość, którą tworzysz, SparkSession.buildernie będąc częścią org.apache.sparkpakietu. W użyciu są dwie sparknazwy. To ten, sparkktóry masz dostępny po spark-shellwyjęciu z pudełka.
Jacek Laskowski
1
Dzięki, Jacek. Poprawiłem: obiekt SparkSession.builder jest przekazywany jako parametr (wydaje się najlepszym rozwiązaniem) od pierwszej ogólnej inicjalizacji, teraz działa.
Peter Krauss
Czy istnieje sposób na utworzenie pustej ramki danych przy użyciu cechy zamiast klasy przypadku: stackoverflow.com/questions/64276952/…
supernatural
3
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]
Ravindra
źródło
3

Tutaj możesz stworzyć schemat używając StructType w scali i przekazać Empty RDD, abyś mógł stworzyć pustą tabelę. Poniższy kod dotyczy tego samego.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType



//import org.apache.hadoop.hive.serde2.objectinspector.StructField

object EmptyTable extends App {
  val conf = new SparkConf;
  val sc = new SparkContext(conf)
  //create sparksession object
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

  //Created schema for three columns 
   val schema = StructType(
    StructField("Emp_ID", LongType, true) ::
      StructField("Emp_Name", StringType, false) ::
      StructField("Emp_Salary", LongType, false) :: Nil)

      //Created Empty RDD 

  var dataRDD = sc.emptyRDD[Row]

  //pass rdd and schema to create dataframe
  val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)

  newDFSchema.createOrReplaceTempView("tempSchema")

  sparkSession.sql("create table Finaltable AS select * from tempSchema")

}
Nilesh Shinde
źródło
2

Wersja Java do tworzenia pustego zestawu danych:

public Dataset<Row> emptyDataSet(){

    SparkSession spark = SparkSession.builder().appName("Simple Application")
                .config("spark.master", "local").getOrCreate();

    Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());

    return emptyDataSet;
}

public StructType getSchema() {

    String schemaString = "column1 column2 column3 column4 column5";

    List<StructField> fields = new ArrayList<>();

    StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
    fields.add(indexField);

    for (String fieldName : schemaString.split(" ")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }

    StructType schema = DataTypes.createStructType(fields);

    return schema;
}
Molay
źródło
1

Oto rozwiązanie, które tworzy pustą ramkę danych w pyspark 2.0.0 lub nowszym.

from pyspark.sql import SQLContext
sc = spark.sparkContext
schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
sqlContext.createDataFrame(sc.emptyRDD(), schema)
braj
źródło
-3

Od Spark 2.4.3

val df = SparkSession.builder().getOrCreate().emptyDataFrame
duch lisa
źródło
7
Nie rozwiązuje to części pytania dotyczącej schematu.
Andrew Sklyarevsky