pypsark寫入hive,在新版pyspark中,使用SparkSession來代替之前的from pyspark.sql import HiveContext
一、代碼實例
# -*- coding: utf-8 -*-
import findspark
findspark.init()
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder. \
appName("spark_test"). \
config("spark.sql.shuffle.partitions", 10). \
config("spark.default.parallelism", 10). \
config("hive.warehouse.subdir.inherit.perms", "false"). \
enableHiveSupport(). \
getOrCreate()
today_date = datetime.datetime.now().strftime('%Y-%m-%d')
data = ["10,test1","11,test2"]*100
# 1.原始數據,一個rdd
rdd = spark.sparkContext.parallelize(data)
rdd = rdd.map(lambda x: x.split(","))
rdd = rdd.map(lambda x:[int(x[0]),x[1]])
# 2.構建一個schema
sch = StructType([
StructField("user_id",IntegerType(),True),
StructField("item_id", StringType(), True)
])
# 3.將rdd轉化為dataFrame
df = spark.createDataFrame(rdd, sch)
# 4.創建臨時表
df.createOrReplaceTempView("tmpv")
# print(df.take(10))
# 5.執行sql數據導入到hive
this_sql = """
insert into table database1.table1 partition(opdate='{partition}') select * from tmpv
""".format(partition=today_date)
spark.sql(this_sql)
spark.stop()
二、bug記錄
之前一直是把結果保存到hbase,現在需要保存到hive中。
1、setfacl: Permission denied user=root is not the owner of inode=/user/hive/warehouse/...
雖然一直顯示這個,但是可以正常保存數據。
2、ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
使用CDH集群時,一個可以忽略的錯誤
https://blog.csdn.net/xwd127429/article/details/105864035
3、 ValueError: field user_id: object of IntegerType out of range, got: 44376428908161556
由於user_id
的長度過長,又使用了IntegerType()
,所以需要改為LongType()
。
關於這些類型:https://www.cnblogs.com/wonglu/p/8390710.html
中間沒有打印df.take(10)
,所以錯誤一直以為是發生在spark.sql(this_sql)
階段。而這里報錯為java錯誤,根本看不到問題所在。bug處理能力level 0。
4、StructType can not accept object
當只有一個元素的時候,改成元組形式。
https://stackoverflow.com/questions/47927044/structtype-can-not-accept-object