pyspark寫入hive(二) 使用 saveAsTable


一、問題描述

pyspark寫入hive分區表中,使用了建臨時表的方式。一般情況下是沒有問題的,但是當涉及到class pyspark.sql.types.FloatType,就會出現bug。
比如當統計列表中每個單詞出現的概率,同時保留最多四位小數

from Collections import Counter
mylist = ["a","b","c","a"]
k_p_dict = dict()
d = Counter(mylist)
for item in d:
  p = round(d[item] / len_keyword,4)
  k_p_dict[item] = p

但是如果使用臨時表方法,那么需要通過schma轉換為DataFrame

sch = StructType([
        StructField("k_p", MapType(StringType(),FloatType()), True)
    ])
df = spark.createDataFrame(myrdd,sch)

rdd轉換為DataFrame之后,字典的value值就不再是4位小數,而是比如0.11110000312328339

二、使用 saveAsTable()

df直接寫入hive。

from pyspark.sql import Row
def data2row(x):
  ...
  # 直接返回Row()格式的數據
  return Row(userid=user_id,k_p=k_p_dict)

# 1. 和之前方法一樣,從hive表取數據
df = spark.sql(my_sql)
# 2. DataFrame沒有map方法,所以轉換為rdd,然后對每一個列處理之后再通過toDF()轉換為DataFrame
df = df.rdd.map(lambda x: data2row(x)).toDF()
# 3. 保存到hive表。mode可以使用append、overwrite
# "overwrite"是重寫表的模式,如果表存在,就覆蓋掉原始數據,如果不存在就重新生成一張表
# mode("append")是在原有表的基礎上追加數據
df.write.format("hive").mode("append").saveAsTable("mytable")

三、思考

當指定分區的時候,需要分區字段也有值。就需要再原數據中加入分區字段。而使用insert overwrite xxx的方法不需要。所以saveAsTable用時更多。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM