Jak zapisać DataFrame bezpośrednio w Hive?

85

Czy można zapisać DataFramew Spark bezpośrednio w Hive?

Próbowałem przekonwertować DataFramedo, Rdda następnie zapisać jako plik tekstowy, a następnie załadować w gałęzi. Ale zastanawiam się, czy mogę bezpośrednio zapisać dataframedo ula

Gourav
źródło

Odpowiedzi:

116

Możesz utworzyć tymczasową tabelę w pamięci i przechowywać ją w tabeli gałęzi przy użyciu sqlContext.

Powiedzmy, że twoja ramka danych to myDf. Możesz utworzyć jedną tabelę tymczasową za pomocą,

myDf.createOrReplaceTempView("mytempTable") 

Następnie możesz użyć prostej instrukcji hive, aby utworzyć tabelę i zrzucić dane z tabeli tymczasowej.

sqlContext.sql("create table mytable as select * from mytempTable");
Vinay Kumar
źródło
2
to omijało błędy odczytu parkietu, które otrzymywałem podczas używania write.saveAsTable w Spark 2.0
ski_squaw
2
Tak, jednak możemy użyć partycji według ramki danych przed utworzeniem tabeli tymczasowej. @chhantyal
Vinay Kumar
1
Jak udało Ci się połączyć i dopasować temporarystół do hivestołu? Robiąc show tablesto, zawiera tylko hivetabele dla mojej spark 2.3.0instalacji
StephenBoesch
1
ta tabela tymczasowa zostanie zapisana w kontekście gałęzi i w żaden sposób nie należy do tabel gałęzi.
Vinay Kumar
1
cześć @VinayKumar, dlaczego mówisz „Jeśli używasz saveAsTable (bardziej przypomina to utrwalanie ramki danych), musisz upewnić się, że masz wystarczająco dużo pamięci przydzielonej do aplikacji Spark”. czy mógłbyś wyjaśnić ten punkt?
enneppi
27

Użyj DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) Zobacz temat Spark SQL i DataFrame Guide .

Daniel Darabos
źródło
4
saveAsTable nie tworzy tabel zgodnych z Hive. Najlepsze rozwiązanie, jakie znalazłem, to Vinay Kumar.
RChat
@Jacek: Sam dodałem tę notatkę, bo uważam, że moja odpowiedź jest błędna. Usunąłbym to, gdyby nie to, że jest akceptowane. Czy uważasz, że notatka jest błędna?
Daniel Darabos
Tak. Notatka była błędna i dlatego ją usunąłem. „Proszę mnie poprawić, jeśli się mylę” obowiązuje tutaj :)
Jacek Laskowski
1
czy spowoduje to df.write().saveAsTable(tableName) również zapisanie danych strumieniowych do tabeli?
user1870400
1
nie, nie możesz zapisać danych strumieniowych za pomocą saveAsTable, nie ma ich nawet w interfejsie API
Brian
20

Nie widzę df.write.saveAsTable(...)wycofanych w dokumentacji Spark 2.0. To zadziałało dla nas na Amazon EMR. Doskonale byliśmy w stanie wczytać dane z S3 do ramki danych, przetworzyć je, stworzyć tabelę z wyniku i odczytać ją za pomocą MicroStrategy. Odpowiedź Vinays też się sprawdziła.

Alex
źródło
5
Ktoś oznaczył tę odpowiedź jako niską jakość ze względu na długość i treść. Szczerze mówiąc, prawdopodobnie byłoby lepiej jako komentarz. Wydaje mi się, że trwało to od dwóch lat i niektórym osobom przydało się to, więc pozostawienie rzeczy tak, jak jest?
serakfalcon
Zgadzam się, komentarz byłby lepszym wyborem. Wyciągnięta lekcja :-)
Alex
15

musisz mieć / utworzyć HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Następnie bezpośrednio zapisz ramkę danych lub wybierz kolumny do zapisania jako tabela gałęzi

df to dataframe

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

lub

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

lub

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes to Append / Ignore / Overwrite / ErrorIfExists

Dodałem tutaj definicję dla HiveContext z dokumentacji Spark,

Oprócz podstawowego SQLContext można również utworzyć HiveContext, który zapewnia nadzbiór funkcji zapewnianych przez podstawowy SQLContext. Dodatkowe funkcje obejmują możliwość pisania zapytań przy użyciu pełniejszego parsera HiveQL, dostępu do funkcji Hive UDF oraz możliwość odczytu danych z tabel Hive. Aby użyć HiveContext, nie musisz mieć istniejącej konfiguracji Hive, a wszystkie źródła danych dostępne dla SQLContext są nadal dostępne. HiveContext jest pakowany tylko osobno, aby uniknąć uwzględnienia wszystkich zależności Hive w domyślnej kompilacji Spark.


w Spark w wersji 1.6.2 użycie „dbName.tableName” powoduje wystąpienie tego błędu:

org.apache.spark.sql.AnalysisException: określenie nazwy bazy danych lub innych kwalifikatorów nie jest dozwolone w przypadku tabel tymczasowych. Jeśli nazwa tabeli zawiera kropki (.), Zacytuj ją za pomocą odwrotnych apostrofów () .`

Anandkumar
źródło
To drugie polecenie: 'df.select (df.col ("col1"), df.col ("col2"), df.col ("col3")) .write (). Mode ("overwrite"). SaveAsTable ("schemaName.tableName"); ' wymaganie, aby wybrane kolumny, które chcesz nadpisać, istniały już w tabeli? Więc masz istniejącą tabelę i nadpisujesz tylko istniejące kolumny 1, 2, 3 nowymi danymi z twojego df w Spark? czy to jest zinterpretowane poprawnie?
dieHellste
3
df.write().mode...należy zmienić nadf.write.mode...
użytkownik 923227
8

Zapisywanie do Hive to tylko kwestia użycia write()metody SQLContext:

df.write.saveAsTable(tableName)

Zobacz https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

From Spark 2.2: użyj DataSet zamiast DataFrame.

Raktotpal Bordoloi
źródło
Wygląda na to, że wystąpił błąd informujący o przerwaniu pracy. Wypróbowałem następujący kod: pyspark_df.write.mode („overwrite”). SaveAsTable („InjuryTab2”)
Sade,
Cześć! dlaczego to From Spark 2.2: use DataSet instead DataFrame.
onofricamila
3

Przepraszam, piszę późno na post, ale nie widzę zaakceptowanej odpowiedzi.

df.write().saveAsTablewyrzuci AnalysisExceptioni nie jest kompatybilny ze stołem HIVE.

Przechowywanie DF, które df.write().format("hive")powinno załatwić sprawę!

Jeśli jednak to nie zadziała, to kierując się wcześniejszymi komentarzami i odpowiedziami, jest to moim zdaniem najlepsze rozwiązanie (choć otwarte na sugestie).

Najlepszym podejściem jest jawne utworzenie tabeli HIVE (w tym tabeli PARTITIONED),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

zapisz DF jako tabelę tymczasową,

df.createOrReplaceTempView("$tempTableName")

i wstaw do tabeli PARTITIONED HIVE:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Offcourse ostatnia kolumna w DF będzie KOLUMNA PODZIAŁU więc utworzyć tabelę odpowiednio ula!

Prosimy o komentarz, jeśli to działa! albo nie.


--AKTUALIZACJA--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
Harshv
źródło
1

Oto wersja PySpark do tworzenia tabeli Hive z pliku parkiet. Być może wygenerowano pliki Parquet przy użyciu wywnioskowanego schematu, a teraz chcesz wypchnąć definicję do metastore Hive. Możesz także przekazać definicję do systemu, takiego jak AWS Glue lub AWS Athena, a nie tylko do magazynu metastore Hive. Tutaj używam spark.sql do wypychania / tworzenia trwałej tabeli.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
kartik
źródło
1

W przypadku zewnętrznych tabel Hive używam tej funkcji w PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
Shadowtrooper
źródło
1

W moim przypadku to działa dobrze:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

Gotowe!!

Możesz czytać Dane, podając jako „Pracownik”

hive.executeQuery("select * from Employee").show()

Aby uzyskać więcej informacji, użyj tego adresu URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

MD Rijwan
źródło
0

Jeśli chcesz utworzyć tabelę gałęzi (która nie istnieje) z ramki danych (czasami nie udaje się jej utworzyć DataFrameWriter.saveAsTable). StructType.toDDLpomoże w wyświetlaniu kolumn jako ciągu.

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tablezostanie utworzony w domyślnej przestrzeni, ponieważ nie udostępniliśmy żadnej bazy danych pod adresem spark.sql(). stg.hive_tablemoże być używany do tworzenia hive_tablew stgbazie danych.

mrsrinivas
źródło
Szczegółowy przykład znajduje się tutaj: stackoverflow.com/a/56833395/1592191
mrsrinivas
0

Możesz użyć biblioteki Spark-Llap Hortonworks w ten sposób

import com.hortonworks.hwc.HiveWarehouseSession

df.write
  .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
  .mode("append")
  .option("table", "myDatabase.myTable")
  .save()
mikrofon
źródło