Chcę tworzyć dalej DataFrame
z określonym schematem w Scali. Próbowałem użyć odczytu JSON (mam na myśli czytanie pustego pliku), ale nie sądzę, że to najlepsza praktyka.
scala
apache-spark
dataframe
apache-spark-sql
user1735076
źródło
źródło
Począwszy od Spark 2.0.0, możesz wykonać następujące czynności.
Klasa przypadku
Zdefiniujmy
Person
klasę przypadku:scala> case class Person(id: Int, name: String) defined class Person
Importuj
spark
niejawną sesję SparkSessionEncoders
:scala> import spark.implicits._ import spark.implicits._
I użyj SparkSession, aby utworzyć pusty
Dataset[Person]
:scala> spark.emptyDataset[Person] res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
Schemat DSL
Można również użyć schematu „DSL” (zobacz Funkcje obsługi ramek danych w org.apache.spark.sql.ColumnName ).
scala> val id = $"id".int id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true) scala> val name = $"name".string name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true) scala> import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType scala> val mySchema = StructType(id :: name :: Nil) mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true)) scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema) emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> emptyDF.printSchema root |-- id: integer (nullable = true) |-- name: string (nullable = true)
źródło
spark.emptyDataset
nie ma tego w moim module, jak go używać? są jakieś (poprawne) podobne do (niepoprawne)val df = apache.spark.emptyDataset[RawData]
?spark
to wartość, którą tworzysz,SparkSession.builder
nie będąc częściąorg.apache.spark
pakietu. W użyciu są dwiespark
nazwy. To ten,spark
który masz dostępny pospark-shell
wyjęciu z pudełka.import scala.reflect.runtime.{universe => ru} def createEmptyDataFrame[T: ru.TypeTag] = hiveContext.createDataFrame(sc.emptyRDD[Row], ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType] ) case class RawData(id: String, firstname: String, lastname: String, age: Int) val sourceDF = createEmptyDataFrame[RawData]
źródło
Tutaj możesz stworzyć schemat używając StructType w scali i przekazać Empty RDD, abyś mógł stworzyć pustą tabelę. Poniższy kod dotyczy tego samego.
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StringType //import org.apache.hadoop.hive.serde2.objectinspector.StructField object EmptyTable extends App { val conf = new SparkConf; val sc = new SparkContext(conf) //create sparksession object val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() //Created schema for three columns val schema = StructType( StructField("Emp_ID", LongType, true) :: StructField("Emp_Name", StringType, false) :: StructField("Emp_Salary", LongType, false) :: Nil) //Created Empty RDD var dataRDD = sc.emptyRDD[Row] //pass rdd and schema to create dataframe val newDFSchema = sparkSession.createDataFrame(dataRDD, schema) newDFSchema.createOrReplaceTempView("tempSchema") sparkSession.sql("create table Finaltable AS select * from tempSchema") }
źródło
Wersja Java do tworzenia pustego zestawu danych:
public Dataset<Row> emptyDataSet(){ SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema()); return emptyDataSet; } public StructType getSchema() { String schemaString = "column1 column2 column3 column4 column5"; List<StructField> fields = new ArrayList<>(); StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true); fields.add(indexField); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; }
źródło
Oto rozwiązanie, które tworzy pustą ramkę danych w pyspark 2.0.0 lub nowszym.
from pyspark.sql import SQLContext sc = spark.sparkContext schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)]) sqlContext.createDataFrame(sc.emptyRDD(), schema)
źródło
Jest to przydatne do celów testowych.
Seq.empty[String].toDF()
źródło
Od Spark 2.4.3
val df = SparkSession.builder().getOrCreate().emptyDataFrame
źródło