
負責的一個任務平台項目的spark版本是1.6.1的,主要變成語言是python;
現階段要把spark從1.6.1 直接 升級到2.4.6版本,這期間遇到很多問題,特此記錄:
1、語法兼容問題
數據平台任務會分成天任務、小時任務,我們會把處理后的數據寫入到hive的表里面(分區里面)
比如:

1.6版本使用的最終落地語法是:
source.write.format("orc").partitionBy(%s).insertInto("%s.%s", True)
當升級到2.4以后,報錯:
insertInto() can't be used together with partitionBy()
因為在spark2.0以后,認為insertInto本身要插入的表是有分區的(分區是在創建表的時候指明的),所以不需要使用partitionBy
但是我們的表是需要進行分區插入的,比如:
CREATE EXTERNAL TABLE `ad.adwise_ad_order`( `sdate` int COMMENT '日期', `order_id` string COMMENT '廣告訂單ID', `req_num` bigint COMMENT '廣告請求量', `imp_filter_pv` bigint COMMENT '廣告展現過濾PV', `click_filter_pv` bigint COMMENT '廣告點擊過濾PV', `imp_num` bigint COMMENT '廣告曝光量', `vis_req_num` bigint COMMENT '廣告可見請求量', `vis_imp_num` bigint COMMENT '廣告可見曝光量', `vis_display_num` bigint COMMENT '廣告可見展現量', `click_num` bigint COMMENT '廣告點擊量', `lands_num` bigint COMMENT '廣告線索量', `req_uv` bigint COMMENT '廣告請求UV', `imp_uv` bigint COMMENT '廣告曝光UV', `imp_login_uv` bigint COMMENT '廣告曝光會員數', `vis_req_uv` bigint COMMENT '廣告可見請求UV', `vis_imp_uv` bigint COMMENT '廣告可見曝光UV', `vis_imp_login_uv` bigint COMMENT '廣告可見曝光會員數', `lands_uv` bigint COMMENT '廣告線索UV', `click_uv` bigint COMMENT '廣告點擊UV', `lands_login_uv` bigint COMMENT '廣告線索會員數') PARTITIONED BY ( `dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'colelction.delim'=',', 'field.delim'='\t', 'line.delim'='\n', 'mapkey.delim'=':', 'serialization.format'='\t') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'viewfs://AutoLfCluster/team/ad/pre/adwise_ad_order' TBLPROPERTIES ( 'transient_lastDdlTime'='1512551282')
1.1、saveAsTable導致所有分區被覆蓋
於是查了下API,發現partitionBy和saveAsTable是可以組合的,於是無腦將代碼改成:
df.write.mode("override").partitionBy(['dt', 'hour']).saveAsTable("XXXXX")
因為沒有拿臨時表做測試,自以為跑的沒問題了,結果結果第二天發現,表的分區被覆蓋了
ps.這里提供個腳本,可以吧被覆蓋掉的分區文件掛回來
#! /bin/bash #倒序按天遍歷日期 #傳入遍歷的開始時間和結束時間 startdate="$1" enddate="$2" echo 'startdate: '$startdate echo 'enddate: '$enddate echo "-----------------------------------" #序列1-300,表示遍歷300次,因為有結束時間的限制,所以實際上不會循環300次 for i in `seq 1 300`; do #當開始時間小於結束時間時,直接結束腳本 if [[ $startdate -lt $enddate ]]; then break fi echo $startdate #執行hiveSQL腳本,我是需要按日期執行hiveSQL,這里可以無視 hive -e "alter table ad.clues_unable_distribute add partition(dt='$startdate');" #每次執行后,使開始日期減一天,如果要正序,將下面-1換成+1即可,當然開始時間和結束時間也要換一下 startdate=$(date -d "$startdate -1 day" +%Y%m%d) done
1.2、saveAsTable導致元數據不一致問題
上面把表的分區覆蓋,通過腳本掛回分區,回滾代碼、切換回1.6版本后,我們的業務同事在其中一個表上添加了一個字段,暫且認為,在tableA上添加一個column1;
業務人員精心寫的SQL,在hive的客戶端是跑是完全沒問題的,於是提交代碼,上線...
然后這個任務就不斷的報錯,報錯的內容就是,新添加的這個column1在tableA這個表中找不到這列;
然后在zeppline上執行業務SQL,發現依然是找不到這個column1這個列。這說明spark的meta和hive的meta不一致了!
然后在jira上說,在你數據插入完成后(insertInto),你應該刷新一下表;
API是:
spark.catalog.refreshTable()
排列是這樣的:
1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True) 2 spark.catalog.refreshTable()
然后發現不管用;
依次嘗試了
1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True) 2 spark.sql("alter table xxx add if not exists partition(a=b,c=d)")
以上均是不可以的,最后發現,是saveAsTable把之前的刪除掉,覆蓋了。導致這個表很特殊,無法同步hive的meta信息
於是表drop掉,重新創建外部表,在把分區重新掛回來,這樣新添加的字段就出來了,注意:以上問題就是因為saveAsTable覆蓋表造成的
1.3、InsertInto(db.table , false)導致的數據傾斜
以上方案行不通之后,我把插入語句改成了:
InsertInto(db.table , false)
第二個參數是false,意思是不覆蓋;然后發現數據分區的確不會被覆蓋了,但是會出現數據全都跑到一個分區里面了。而且是追加模式,導致數據傾斜
1.4、最終解決
最終搞定的方式代碼直接發出來,核心就是:
spark.sql.sources.partitionOverwriteMode
最終插入代碼:
#!/usr/bin/python # -*- coding:utf-8 -*- # describe: local csv to hive orc # create:%s import os os.environ['SPARK_HOME']="/data/sysdir/servers/spark-2.4.6-bin-2.7.2-scala2.11" from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql import HiveContext import sys reload(sys) sys.setdefaultencoding( "utf-8" ) spark = SparkSession.builder.appName("%s").config("spark.sql.sources.partitionOverwriteMode","dynamic").enableHiveSupport().getOrCreate() hiveContext = HiveContext(spark) hiveContext.sql("SET hive.exec.dynamic.partition = true") hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict") sink = hiveContext.table("%s.%s") df = hiveContext.read.format("com.autohome.databricks.spark.csv")%s.option("treatEmptyValuesAsNulls", "true").option("header","false").option('delimiter', '\\t').load("file://%s", schema=sink.schema).repartition(1) #增加分區的值 此前為空 for i in zip([%s],[%s]): df = df.withColumn(i[0], when(df[i[0]] != i[1], i[1]).otherwise(i[1])) df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True) spark.sparkContext.stop() ''' % (date, db+'.'+table+'.'+date,db, table,nullDealingStr,datafilepath,partitions, pvalues, db, table)
insertInto和saveAsTable區別
這里簡單說明下兩種API的區別:
首先看下API的源碼介紹:
1):insertInto
insertInto的官方聲明

上面聲明了2點信息:
1、spark-sql插入數據的時候,使用的是DataFrame,那么這個DataFrame的chema必須要和目標表(要插入的表)的schema信息一致
2、insertInto和saveAsTable不一樣,insertInto是通過適應位置來進行數據插入的
上面兩點聲明很讓人懵逼,因為感覺是矛盾的;但是只要記住一點,就能理解上面說的問題了:
insertInto使用的前提是這個表是存在的,它是在這個表的基礎上進行插入的
saveAsTable是不依賴於這個表是否存在的,並且saveAsTable在寫表的數據時候,是按照字段名稱進行匹配插入的
2、spark2.0以后不支持pyspark提交腳本問題
有很多作業是使用pyspark進行提交的。但是在需要注意,2.0以后,spark不支持使用pyspark來 提交腳本了,所以要把pyspark統一改成spark-submit來提交腳本
3、phoenix版本不兼容問題
重新編譯即可,百度搜索一大段
4、csv文件不支持Map問題
解決連接: https://www.cnblogs.com/niutao/p/13674489.html
git代碼地址:https://github.com/niutaofan/pareCSV.git
5、處理CSV的時候,關於雙引號" 導致格式無法識別問題
跟處理“csv文件不支持Map問題”的解決方式一樣,出現的問題是spark處理csv的代碼,主要是:commons-csv代碼。
那么在通過自定義數據源的時候,spark會對csv文件進行掃描,代碼:

通過tokenRdd來掃描每一行數據,生成rdd,問題就出現在這個方法里面:

上面parseCSV就對每一行csv數據做解析,然后返回RDD
無法解析特殊字符的報錯,就是這個方法里面:

可以看到311行代碼,CSVParser.parse(line, csvFormat).getRecords ,這里面的line就是每一行數據
因為我這邊報錯是類似一行csv數據中有一些非法的字符,比如字符串:

所以,簡單對這一行代碼做修改處理即可,比如:

這樣就可以解決csv每一行中非法的雙引號問題了!
