在項目中使用spark-stream讀取kafka數據源的數據,然后轉成dataframe,再后通過sql方式來進行處理,然后放到hive表中,
遇到問題如下,hive-metastor在沒有做高可用的情況下,有時候會出現退出,這個時候,spark streaminG的微批作業就會失敗,
然后再啟重動hive-metastore進程后,作業繼續正常執行,數據就有丟失.
分析如下:
第一步,觀察日志發現,
我原來的代碼這么寫的:
xx.foreachRdd(rdd=>
processRdd(rdd)
updatezkOffset(rdd)
)
原以為,如果任務不成功,就應該不更新offset,
原想的是,如果processrdd出現異常,則不會執行后面的updateoffset,
但processrdd是在線程池中運行的,它出現的異常只是warning,根本不影響后面的updateoffset.
故需要修改代碼,把updateoffset部分放置processrdd中,當其執行完成后調用即可.
第二步測試:
經過測試發現,把updateoffset部分放置processrdd后,模擬hive metastore出問題,
spark-streaming 任務失敗,然后offset 確實沒有更新.
但問題在這里,下一批次的作業,讀取的offset並不是你沒有更新的那一個,而是它計算出來的.
例如假設batch1 job讀取的是0-20,batch2 job讀取的就是21-40,batch3 job讀取的是41-60
即使batch1 job處理任務失敗了,但是后面的batch2 job或batch3 job 讀取數據並執行成功了,
它就會把自己的offset更新.
第三步測試:
經測試發現,使用可恢復的方式,即使用checkpoint.
spark streaming保留了最近的五個batchjob信息,但是也不能解決上面遇到的問題,
如果hive metastore出問題,再恢復,原來存儲的元數據信息也會被新的給替換掉了.
想到的解決方案:
1.如果batch job出現失敗的情況,就直接讓它退出,這是一種解決思路.
2.還使用手工更新zookeeper offset的方法,
如果出現部分batch job失敗的情況,仍不退出,但是我們給應用寫一個支持傳入
offset 范圍的版本,重新執行,把數據補進去.
另外,如何提交spark streaming 已經失敗的batch job?