Czy można zapisać DataFrame
w Spark bezpośrednio w Hive?
Próbowałem przekonwertować DataFrame
do, Rdd
a następnie zapisać jako plik tekstowy, a następnie załadować w gałęzi. Ale zastanawiam się, czy mogę bezpośrednio zapisać dataframe
do ula
scala
apache-spark
hive
apache-spark-sql
Gourav
źródło
źródło
temporary
stół dohive
stołu? Robiącshow tables
to, zawiera tylkohive
tabele dla mojejspark 2.3.0
instalacjiUżyj
DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) Zobacz temat Spark SQL i DataFrame Guide .źródło
df.write().saveAsTable(tableName)
również zapisanie danych strumieniowych do tabeli?Nie widzę
df.write.saveAsTable(...)
wycofanych w dokumentacji Spark 2.0. To zadziałało dla nas na Amazon EMR. Doskonale byliśmy w stanie wczytać dane z S3 do ramki danych, przetworzyć je, stworzyć tabelę z wyniku i odczytać ją za pomocą MicroStrategy. Odpowiedź Vinays też się sprawdziła.źródło
musisz mieć / utworzyć HiveContext
import org.apache.spark.sql.hive.HiveContext; HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Następnie bezpośrednio zapisz ramkę danych lub wybierz kolumny do zapisania jako tabela gałęzi
df to dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
lub
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
lub
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes to Append / Ignore / Overwrite / ErrorIfExists
Dodałem tutaj definicję dla HiveContext z dokumentacji Spark,
Oprócz podstawowego SQLContext można również utworzyć HiveContext, który zapewnia nadzbiór funkcji zapewnianych przez podstawowy SQLContext. Dodatkowe funkcje obejmują możliwość pisania zapytań przy użyciu pełniejszego parsera HiveQL, dostępu do funkcji Hive UDF oraz możliwość odczytu danych z tabel Hive. Aby użyć HiveContext, nie musisz mieć istniejącej konfiguracji Hive, a wszystkie źródła danych dostępne dla SQLContext są nadal dostępne. HiveContext jest pakowany tylko osobno, aby uniknąć uwzględnienia wszystkich zależności Hive w domyślnej kompilacji Spark.
w Spark w wersji 1.6.2 użycie „dbName.tableName” powoduje wystąpienie tego błędu:
źródło
df.write().mode...
należy zmienić nadf.write.mode...
Zapisywanie do Hive to tylko kwestia użycia
write()
metody SQLContext:Zobacz https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)
From Spark 2.2: użyj DataSet zamiast DataFrame.
źródło
From Spark 2.2: use DataSet instead DataFrame.
Przepraszam, piszę późno na post, ale nie widzę zaakceptowanej odpowiedzi.
df.write().saveAsTable
wyrzuciAnalysisException
i nie jest kompatybilny ze stołem HIVE.Przechowywanie DF, które
df.write().format("hive")
powinno załatwić sprawę!Jeśli jednak to nie zadziała, to kierując się wcześniejszymi komentarzami i odpowiedziami, jest to moim zdaniem najlepsze rozwiązanie (choć otwarte na sugestie).
Najlepszym podejściem jest jawne utworzenie tabeli HIVE (w tym tabeli PARTITIONED),
def createHiveTable: Unit ={ spark.sql("CREATE TABLE $hive_table_name($fields) " + "PARTITIONED BY ($partition_column String) STORED AS $StorageType") }
zapisz DF jako tabelę tymczasową,
df.createOrReplaceTempView("$tempTableName")
i wstaw do tabeli PARTITIONED HIVE:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName") spark.sql("select * from default.$hive_table_name").show(1000,false)
Offcourse ostatnia kolumna w DF będzie KOLUMNA PODZIAŁU więc utworzyć tabelę odpowiednio ula!
Prosimy o komentarz, jeśli to działa! albo nie.
--AKTUALIZACJA--
df.write() .partitionBy("$partition_column") .format("hive") .mode(SaveMode.append) .saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
źródło
Oto wersja PySpark do tworzenia tabeli Hive z pliku parkiet. Być może wygenerowano pliki Parquet przy użyciu wywnioskowanego schematu, a teraz chcesz wypchnąć definicję do metastore Hive. Możesz także przekazać definicję do systemu, takiego jak AWS Glue lub AWS Athena, a nie tylko do magazynu metastore Hive. Tutaj używam spark.sql do wypychania / tworzenia trwałej tabeli.
# Location where my parquet files are present. df = spark.read.parquet("s3://my-location/data/") cols = df.dtypes buf = [] buf.append('CREATE EXTERNAL TABLE test123 (') keyanddatatypes = df.dtypes sizeof = len(df.dtypes) print ("size----------",sizeof) count=1; for eachvalue in keyanddatatypes: print count,sizeof,eachvalue if count == sizeof: total = str(eachvalue[0])+str(' ')+str(eachvalue[1]) else: total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',') buf.append(total) count = count + 1 buf.append(' )') buf.append(' STORED as parquet ') buf.append("LOCATION") buf.append("'") buf.append('s3://my-location/data/') buf.append("'") buf.append("'") ##partition by pt tabledef = ''.join(buf) print "---------print definition ---------" print tabledef ## create a table using spark.sql. Assuming you are using spark 2.1+ spark.sql(tabledef);
źródło
W przypadku zewnętrznych tabel Hive używam tej funkcji w PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"): print("Saving result in {}.{}".format(database, table_name)) output_schema = "," \ .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \ .replace("StringType", "STRING") \ .replace("IntegerType", "INT") \ .replace("DateType", "DATE") \ .replace("LongType", "INT") \ .replace("TimestampType", "INT") \ .replace("BooleanType", "BOOLEAN") \ .replace("FloatType", "FLOAT")\ .replace("DoubleType","FLOAT") output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema) sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name)) query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \ .format(database, table_name, output_schema, save_format, database, table_name) sparkSession.sql(query) dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
źródło
W moim przypadku to działa dobrze:
from pyspark_llap import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build() hive.setDatabase("DatabaseName") df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv") df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
Gotowe!!
Możesz czytać Dane, podając jako „Pracownik”
hive.executeQuery("select * from Employee").show()
Aby uzyskać więcej informacji, użyj tego adresu URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
źródło
val df = ... val schemaStr = df.schema.toDDL # This gives the columns spark.sql(s"""create table hive_table ( ${schemaStr})""") //Now write the dataframe to the table df.write.saveAsTable("hive_table")
hive_table
zostanie utworzony w domyślnej przestrzeni, ponieważ nie udostępniliśmy żadnej bazy danych pod adresemspark.sql()
.stg.hive_table
może być używany do tworzeniahive_table
wstg
bazie danych.źródło
Możesz użyć biblioteki Spark-Llap Hortonworks w ten sposób
import com.hortonworks.hwc.HiveWarehouseSession df.write .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .mode("append") .option("table", "myDatabase.myTable") .save()
źródło