Jak mogę przekonwertować RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) na Dataframe org.apache.spark.sql.DataFrame
. Przekonwertowałem ramkę danych na rdd za pomocą .rdd
. Po przetworzeniu chcę go z powrotem w ramce danych. W jaki sposób mogę to zrobić ?
scala
apache-spark
apache-spark-sql
rdd
user568109
źródło
źródło
Odpowiedzi:
SqlContext
ma wielecreateDataFrame
metod, które tworząDataFrame
dany plikRDD
. Wyobrażam sobie, że jeden z nich będzie pasował do twojego kontekstu.Na przykład:
źródło
Ten kod działa doskonale od Sparka 2.x ze Scalą 2.11
Importuj niezbędne klasy
Utwórz
SparkSession
obiekt i oto jestspark
Zróbmy
RDD
toDataFrame
Metoda 1
Korzystanie
SparkSession.createDataFrame(RDD obj)
.Metoda 2
Używanie
SparkSession.createDataFrame(RDD obj)
i określanie nazw kolumn.Metoda 3 (rzeczywista odpowiedź na pytanie)
W ten sposób wymagane jest, aby dane wejściowe
rdd
były typuRDD[Row]
.utwórz schemat
Teraz zastosuj oba
rowsRdd
ischema
docreateDataFrame()
źródło
Zakładając, że Twój RDD [wiersz] nazywa się rdd, możesz użyć:
źródło
Uwaga: ta odpowiedź została pierwotnie opublikowana tutaj
Zamieszczam tę odpowiedź, ponieważ chciałbym podzielić się dodatkowymi szczegółami na temat dostępnych opcji, których nie znalazłem w innych odpowiedziach
Aby utworzyć DataFrame z RDD wierszy, istnieją dwie główne opcje:
1) Jak już wspomniano, możesz użyć pliku,
toDF()
który można zaimportować przezimport sqlContext.implicits._
. Jednak to podejście działa tylko w przypadku następujących typów RDD:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(Źródło: Scaladoc z
SQLContext.implicits
obiektem)Ostatni podpis faktycznie oznacza, że może działać dla RDD krotek lub RDD klas przypadków (ponieważ krotki i klasy przypadków są podklasami
scala.Product
).Tak więc, aby zastosować to podejście do pliku
RDD[Row]
, musisz zmapować go na plikRDD[T <: scala.Product]
. Można to zrobić, mapując każdy wiersz na niestandardową klasę przypadku lub krotkę, jak w następujących fragmentach kodu:lub
Główną wadą tego podejścia (moim zdaniem) jest to, że musisz jawnie ustawić schemat wynikowej ramki DataFrame w funkcji mapy kolumna po kolumnie. Może można to zrobić programowo, jeśli nie znasz schematu z wyprzedzeniem, ale może tam być trochę bałaganu. Tak więc alternatywnie istnieje inna opcja:
2) Możesz użyć
createDataFrame(rowRDD: RDD[Row], schema: StructType)
as w zaakceptowanej odpowiedzi, która jest dostępna w obiekcie SQLContext . Przykład konwersji RDD starej ramki DataFrame:Zauważ, że nie ma potrzeby jawnego ustawiania żadnej kolumny schematu. Ponownie używamy schematu starego DF, który ma
StructType
klasę i może być łatwo rozszerzany. Jednak takie podejście czasami nie jest możliwe, aw niektórych przypadkach może być mniej wydajne niż pierwsze.źródło
import sqlContext.implicits.
Załóżmy, że masz
DataFrame
i chcesz dokonać pewnych modyfikacji w danych pól, konwertując je naRDD[Row]
.Aby przekonwertować z powrotem
DataFrame
odRDD
musimy określić rodzaj konstrukcji z poniższychRDD
.Jeśli typem był typ danych
Long
, stanie się on taki jakLongType
w strukturze.Jeśli
String
toStringType
w strukturze.Teraz można przekonwertować RDD do DataFrame pomocą createDataFrame metody.
źródło
Oto prosty przykład konwersji listy do Spark RDD, a następnie przekonwertowania tego Spark RDD na Dataframe.
Zwróć uwagę, że użyłem scala REPL Spark-shell do wykonania następującego kodu, Tutaj sc jest wystąpieniem SparkContext, które jest niejawnie dostępne w Spark-shell. Mam nadzieję, że odpowie na twoje pytanie.
źródło
Metoda 1: (Scala)
Metoda 2: (Scala)
Metoda 1: (Python)
Metoda 2: (Python)
Wyodrębniono wartość z obiektu wiersza, a następnie zastosowano klasę przypadku, aby przekonwertować rdd na DF
źródło
W nowszych wersjach Spark (2.0+)
źródło
Zakładając, że val Spark jest produktem SparkSession.builder ...
Te same kroki, ale z mniejszą liczbą deklaracji val:
źródło
Próbowałem wyjaśnić rozwiązanie, używając problemu z liczbą słów . 1. Przeczytaj plik za pomocą sc
Metody tworzenia DF
Przeczytaj plik za pomocą Spark
Rdd do Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Metoda 1
Utwórz liczbę słów RDD w Dataframe
Metoda 2
Utwórz Dataframe z Rdd
Metoda 3
Zdefiniuj schemat
import org.apache.spark.sql.types._
val schema = new StructType (). add (StructField ("słowo", StringType, prawda)). add (StructField ("count", StringType, true))
Utwórz RowRDD
Utwórz DataFrame z RDD ze schematem
val df = spark.createDataFrame (rowRdd, schema)
df.show
źródło
Aby przekonwertować Array [Row] na DataFrame lub Dataset, następujące elementy działają elegancko:
Powiedzmy, że schemat to typ StructType dla wiersza
źródło