spark升級:從1.6升級到2.4.6的記錄


 

 

負責的一個任務平台項目的spark版本是1.6.1的,主要變成語言是python;

現階段要把spark從1.6.1 直接 升級到2.4.6版本,這期間遇到很多問題,特此記錄:

1、語法兼容問題

數據平台任務會分成天任務、小時任務,我們會把處理后的數據寫入到hive的表里面(分區里面)

比如:

 

 

 1.6版本使用的最終落地語法是:

source.write.format("orc").partitionBy(%s).insertInto("%s.%s", True)

當升級到2.4以后,報錯:

insertInto() can't be used together with partitionBy()

因為在spark2.0以后,認為insertInto本身要插入的表是有分區的(分區是在創建表的時候指明的),所以不需要使用partitionBy

但是我們的表是需要進行分區插入的,比如:

CREATE EXTERNAL TABLE `ad.adwise_ad_order`(
  `sdate` int COMMENT '日期', 
  `order_id` string COMMENT '廣告訂單ID', 
  `req_num` bigint COMMENT '廣告請求量', 
  `imp_filter_pv` bigint COMMENT '廣告展現過濾PV', 
  `click_filter_pv` bigint COMMENT '廣告點擊過濾PV', 
  `imp_num` bigint COMMENT '廣告曝光量', 
  `vis_req_num` bigint COMMENT '廣告可見請求量', 
  `vis_imp_num` bigint COMMENT '廣告可見曝光量', 
  `vis_display_num` bigint COMMENT '廣告可見展現量', 
  `click_num` bigint COMMENT '廣告點擊量', 
  `lands_num` bigint COMMENT '廣告線索量', 
  `req_uv` bigint COMMENT '廣告請求UV', 
  `imp_uv` bigint COMMENT '廣告曝光UV', 
  `imp_login_uv` bigint COMMENT '廣告曝光會員數', 
  `vis_req_uv` bigint COMMENT '廣告可見請求UV', 
  `vis_imp_uv` bigint COMMENT '廣告可見曝光UV', 
  `vis_imp_login_uv` bigint COMMENT '廣告可見曝光會員數', 
  `lands_uv` bigint COMMENT '廣告線索UV', 
  `click_uv` bigint COMMENT '廣告點擊UV', 
  `lands_login_uv` bigint COMMENT '廣告線索會員數')
PARTITIONED BY ( 
  `dt` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
WITH SERDEPROPERTIES ( 
  'colelction.delim'=',', 
  'field.delim'='\t', 
  'line.delim'='\n', 
  'mapkey.delim'=':', 
  'serialization.format'='\t') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'viewfs://AutoLfCluster/team/ad/pre/adwise_ad_order'
TBLPROPERTIES (
  'transient_lastDdlTime'='1512551282')
View Code

1.1、saveAsTable導致所有分區被覆蓋

於是查了下API,發現partitionBy和saveAsTable是可以組合的,於是無腦將代碼改成:

df.write.mode("override").partitionBy(['dt', 'hour']).saveAsTable("XXXXX")

因為沒有拿臨時表做測試,自以為跑的沒問題了,結果結果第二天發現,表的分區被覆蓋了

ps.這里提供個腳本,可以吧被覆蓋掉的分區文件掛回來

#! /bin/bash
#倒序按天遍歷日期

#傳入遍歷的開始時間和結束時間
startdate="$1"
enddate="$2"

echo 'startdate: '$startdate
echo 'enddate: '$enddate
echo "-----------------------------------"

#序列1-300,表示遍歷300次,因為有結束時間的限制,所以實際上不會循環300次
for i in `seq 1 300`; do
  #當開始時間小於結束時間時,直接結束腳本
  if [[ $startdate -lt $enddate ]]; then
    break
  fi
  echo $startdate 
  #執行hiveSQL腳本,我是需要按日期執行hiveSQL,這里可以無視
  hive -e "alter table ad.clues_unable_distribute add partition(dt='$startdate');"
  #每次執行后,使開始日期減一天,如果要正序,將下面-1換成+1即可,當然開始時間和結束時間也要換一下
  startdate=$(date -d "$startdate -1 day" +%Y%m%d)
done
View Code

 

1.2、saveAsTable導致元數據不一致問題

上面把表的分區覆蓋,通過腳本掛回分區,回滾代碼、切換回1.6版本后,我們的業務同事在其中一個表上添加了一個字段,暫且認為,在tableA上添加一個column1;

業務人員精心寫的SQL,在hive的客戶端是跑是完全沒問題的,於是提交代碼,上線...

然后這個任務就不斷的報錯,報錯的內容就是,新添加的這個column1在tableA這個表中找不到這列;

然后在zeppline上執行業務SQL,發現依然是找不到這個column1這個列。這說明spark的meta和hive的meta不一致了!

然后在jira上說,在你數據插入完成后(insertInto),你應該刷新一下表;

API是:

spark.catalog.refreshTable()

排列是這樣的:

1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
2 spark.catalog.refreshTable()

然后發現不管用;

依次嘗試了

1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
2 spark.sql("alter table xxx add if not exists partition(a=b,c=d)")

以上均是不可以的,最后發現,是saveAsTable把之前的刪除掉,覆蓋了。導致這個表很特殊,無法同步hive的meta信息

於是表drop掉,重新創建外部表,在把分區重新掛回來,這樣新添加的字段就出來了,注意:以上問題就是因為saveAsTable覆蓋表造成的

 

1.3、InsertInto(db.table , false)導致的數據傾斜

以上方案行不通之后,我把插入語句改成了:

InsertInto(db.table , false)

第二個參數是false,意思是不覆蓋;然后發現數據分區的確不會被覆蓋了,但是會出現數據全都跑到一個分區里面了。而且是追加模式,導致數據傾斜

 

1.4、最終解決

最終搞定的方式代碼直接發出來,核心就是:

spark.sql.sources.partitionOverwriteMode

最終插入代碼:

#!/usr/bin/python
# -*- coding:utf-8 -*-
# describe: local csv to hive orc
# create:%s
import os
os.environ['SPARK_HOME']="/data/sysdir/servers/spark-2.4.6-bin-2.7.2-scala2.11"
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import HiveContext
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
spark = SparkSession.builder.appName("%s").config("spark.sql.sources.partitionOverwriteMode","dynamic").enableHiveSupport().getOrCreate()
hiveContext = HiveContext(spark)
hiveContext.sql("SET hive.exec.dynamic.partition = true")
hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
sink = hiveContext.table("%s.%s")
df = hiveContext.read.format("com.autohome.databricks.spark.csv")%s.option("treatEmptyValuesAsNulls", "true").option("header","false").option('delimiter', '\\t').load("file://%s", schema=sink.schema).repartition(1)
#增加分區的值 此前為空
for i in zip([%s],[%s]):
    df = df.withColumn(i[0], when(df[i[0]] != i[1], i[1]).otherwise(i[1]))
df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
spark.sparkContext.stop()
''' % (date, db+'.'+table+'.'+date,db, table,nullDealingStr,datafilepath,partitions, pvalues,  db, table)

 

 insertInto和saveAsTable區別

 這里簡單說明下兩種API的區別:

首先看下API的源碼介紹:

1):insertInto

insertInto的官方聲明

 

 上面聲明了2點信息:

1、spark-sql插入數據的時候,使用的是DataFrame,那么這個DataFrame的chema必須要和目標表(要插入的表)的schema信息一致 

2、insertInto和saveAsTable不一樣,insertInto是通過適應位置來進行數據插入的

上面兩點聲明很讓人懵逼,因為感覺是矛盾的;但是只要記住一點,就能理解上面說的問題了:

insertInto使用的前提是這個表是存在的,它是在這個表的基礎上進行插入的
saveAsTable是不依賴於這個表是否存在的,並且saveAsTable在寫表的數據時候,是按照字段名稱進行匹配插入的

 

2、spark2.0以后不支持pyspark提交腳本問題

有很多作業是使用pyspark進行提交的。但是在需要注意,2.0以后,spark不支持使用pyspark來 提交腳本了,所以要把pyspark統一改成spark-submit來提交腳本

3、phoenix版本不兼容問題

重新編譯即可,百度搜索一大段 

4、csv文件不支持Map問題

解決連接: https://www.cnblogs.com/niutao/p/13674489.html

git代碼地址:https://github.com/niutaofan/pareCSV.git

5、處理CSV的時候,關於雙引號" 導致格式無法識別問題

跟處理“csv文件不支持Map問題”的解決方式一樣,出現的問題是spark處理csv的代碼,主要是:commons-csv代碼。

那么在通過自定義數據源的時候,spark會對csv文件進行掃描,代碼:

 

 通過tokenRdd來掃描每一行數據,生成rdd,問題就出現在這個方法里面:

 

 上面parseCSV就對每一行csv數據做解析,然后返回RDD

無法解析特殊字符的報錯,就是這個方法里面:

 

 可以看到311行代碼,CSVParser.parse(line, csvFormat).getRecords ,這里面的line就是每一行數據

因為我這邊報錯是類似一行csv數據中有一些非法的字符,比如字符串:

 

 所以,簡單對這一行代碼做修改處理即可,比如:

 

 這樣就可以解決csv每一行中非法的雙引號問題了!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM