在平常工作中,難免要和大數據打交道,而有時需要讀取本地文件然后存儲到Hive中,本文接下來將具體講解。
過程:
-
使用pickle模塊讀取.plk文件;
-
將讀取到的內容轉為RDD;
-
將RDD轉為DataFrame之后存儲到Hive倉庫中;
1、使用pickle保存和讀取pickle文件
import pickle data = "" path = "xxx.plj" #保存為pickle pickle.dump(data,open(path,'wb')) #讀取pickle data2 = pickle.load(open(path,'rb'))
使用python3讀取python2保存的pickle文件時,會報錯:
UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)
解決方法:
data2 = pickle.load(open(path,'rb',encoding='latin1'))
使用python2讀取python3保存的pickle文件時,會報錯:
unsupported pickle protocol:3
解決方法:
import pickle path = "xxx.plk" path2 = 'xxx2.plk' data = pickle.load(open(path,'rb')) #保存為python2的pickle pickle.dump(data,open(path2,'wb'),protocol=2) #讀取pickle data2 = pickle.load(open(path2,'rb'))
2、讀取pickle的內容並轉為RDD
from pyspark.sql import SparkSession from pyspark.sql import Row import pickle spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() with open(picle_path,"rb") as fp: data = pickle.load(fp) #這里可根據data的類型進行相應的操作 #假設data是一個一維數組:[1,2,3,4,5],讀取數據並轉為rdd pickleRdd = spark.parallelize(data)
3、將rdd轉為dataframe並存入到Hive中
#定義列名 column = Row('col') #轉為dataframe pickleDf =pickleRdd.map(lambda x:column(x)) #存儲到Hive中,會新建數據庫:hive_database,新建表:hive_table,以覆蓋的形式添加,partitionBy用於指定分區字段 pickleDf..write.saveAsTable("hive_database.hvie_table", mode='overwrite', partitionBy=‘’)
補充存入到Hive中的知識:
(1)通過sql的方式
data = [ (1,"3","145"), (1,"4","146"), (1,"5","25"), (1,"6","26"), (2,"32","32"), (2,"8","134"), (2,"8","134"), (2,"9","137") ] df = spark.createDataFrame(data, ['id', "test_id", 'camera_id']) # method one,default是默認數據庫的名字,write_test 是要寫到default中數據表的名字 df.registerTempTable('test_hive') sqlContext.sql("create table default.write_test select * from test_hive")
或者:
# df 轉為臨時表/臨時視圖 df.createOrReplaceTempView("df_tmp_view") # spark.sql 插入hive spark.sql(""insert overwrite table XXXXX # 表名 partition(分區名稱=分區值) # 多個分區按照逗號分開 select XXXXX # 字段名稱,跟hive字段順序對應,不包含分區字段 from df_tmp_view""")
(2)以saveAsTable的形式
# "overwrite"是重寫表的模式,如果表存在,就覆蓋掉原始數據,如果不存在就重新生成一張表 # mode("append")是在原有表的基礎上進行添加數據 df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
以下是通過rdd創建dataframe的幾種方法:
(1)通過鍵值對
d = [{'name': 'Alice', 'age': 1}] output = spark.createDataFrame(d).collect() print(output) # [Row(age=1, name='Alice')]
(2)通過rdd
a = [('Alice', 1)] rdd = sc.parallelize(a) output = spark.createDataFrame(rdd).collect() print(output) output = spark.createDataFrame(rdd, ["name", "age"]).collect() print(output) # [Row(_1='Alice', _2=1)] # [Row(name='Alice', age=1)]
(3)通過rdd和Row
from pyspark.sql import Row a = [('Alice', 1)] rdd = sc.parallelize(a) Person = Row("name", "age") person = rdd.map(lambda r: Person(*r)) output = spark.createDataFrame(person).collect() print(output) # [Row(name='Alice', age=1)]
(4)通過rdd和StrutType
from pyspark.sql.types import * a = [('Alice', 1)] rdd = sc.parallelize(a) schema = StructType( [ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ] ) output = spark.createDataFrame(rdd, schema).collect() print(output) # [Row(name='Alice', age=1)]
(5)基於pandas dataframe創建
df = spark.createDataFrame(rdd, ['name', 'age']) print(df) # DataFrame[name: string, age: bigint] print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'> # 傳入pandas DataFrame output = spark.createDataFrame(df.toPandas()).collect() print(output) # [Row(name='Alice', age=1)]
參考:
https://blog.csdn.net/sinat_28224453/article/details/84977693
https://blog.csdn.net/weixin_39198406/article/details/104916715
https://blog.csdn.net/u011412768/article/details/93426353