Flink: 一次失敗的批處理經歷


 

-----------更新說明----------

突然就想水一篇博客,剛好說明一下這段時間基本沒有更新的問題。

簡單介紹下我的職業生涯,大家有留意的話,可以在博客園上看到,我的博客是從2015年11月開始的,公眾號則是去年才有的。

我15年畢業,就職某外企Java開發,17年進入某曾經很知名的公司做大數據離線(抬高一點,還負責離線報表的前后端/數據庫/接口等),18年底以外包的身份進入某互聯網大廠,專職做實時數據開發,也就是在這段時間,我了解Flink,並決定之后的職業生涯中,專注於Flink 開發。這段時間換了個新公司,做大數據開發,實時離線都有的。(不得不說,外包絕對是我職業生涯中最大的污點)

所以,這段時間更新斷了的原因就是,換了個工作,沒時間寫。之前的博客很多都是自己開發的時候,遇到個值得一寫的內容,然后蘊釀一下,抽點時間摸個魚就水了一篇。剛到新的崗位,還很多不懂的內容,加上比較忙,基本沒有醞釀博客內容,所以也就沒得寫了(太懶了)。

下面是一些扯淡的內容

大概是19年初開始接觸Flink,那時候項目有些拓展的思路,隨后我調研並編寫了HDP的安裝簡介,搭建了Flink的開發測試環境,從此開始學習Flink。其實我比較愚笨,雖然經常會在群里扯淡,但是從來不敢說自己是大佬,就目前來說,我只是比新人多看了幾遍官網的文檔,多寫了幾個Demo。

我對Flink 特別有信心,甚至經常對朋友說:“如果那天不能做Flink 開發了,就回去種地”。😄😄😄

基本上所有業余時間的學習內容,都是Flink和相關的東西,今后主要也是做這個方面的工作,工作穩定之后,博客還是要繼續寫的,開源項目sqlSubmit 也會繼續更新,這也是學習總結的過程。

祝大家端午節安康

-------------------------

最近有個失敗的經歷,有個處理批處理的工作,需要對全量的歷史數據進行處理,用Spark做的。我覺得用Flink 的 Batch也能做這個事情,就用Flink Batch也做了個版本的,結果當然是失敗了。

需求如下:

對用戶生命周期的全量數據進行處理,按用戶和指定維度輸出用戶不同時期的特征數據,用作過濾模型的基礎數據

全量數據在10T左右,存在30個HBase 表中(測試環境 2.6T,7個表),需要按一定的規則讀取,然后對每個用戶的數據進行排序/合並/計算/輸出。

拿到需求的時候,Spark的同學,以用戶做分區,依次分別計算所有用戶的數據(分別是指,一次只算一個用戶的數據,算完了,再算下一個用戶的數據,當然Spark是分布式的,所以任務也是並行的)。

我是覺得這個方案有很大的缺陷,我們有幾百萬的用戶數據,那就需要訪問HBase幾百萬次,再加上30個分表,就讀取數據,就會花很多時間了。

所以我用Flink Batch 開發了個版本,直接load所有表的數據上來,然后用用戶號分區計算用戶數據。這樣就只需要訪問HBase 30次,一次拿全部數據,然后再計算,至少在訪問HBase 這一項上,已經節省了很多時間。

程序開發完,在測試環境跑的時候,就失敗了,沒有計算出結果(當然不是算不出來,而是我覺得時間太長,完全不能接受)。

遇到的問題:

  1.  全量數據太多,讀取太慢。測試環境2.6T的數據,我從來沒有全部讀取上來過,優化過后的程序,跑得最好的一次,一小時讀取了100G的數據,就這樣也需要至少26小時才能把全部數據讀取上來。放棄

  2. Flink Batch 的時候,經常遇到沒見過的問題(之前都是Stream 的任務),很常見的就是 akk timeout,可能還是數據太多,導致任務異常。

  3. taskmanager經常被kill掉,沒有 OOM 日志,完全不知道原因

經過一周多的調試運行,最后還是放棄了這個版本,在流程上優化了方案,用Stream實現了一版,測試的效果看起來還不錯。

放棄的主要原因:

  1. HBase讀取數據太慢了

  2. Flink Batch 處理問題不少,甚至我懷疑,全部數據順利讀取上來,后面做分組/排序合並也會出問題(又沒有那個美國時間去慢慢處理)

  3. 還有就是Spark的同學早就開發完了,並且在線上開始跑全量數據了,雖然很慢(最后應該是耗時100小時才跑完,吐血)

事實上,我覺得一個任務需要超過10小時才能計算完成,就是一個完全不能接受的事情(可能是因為我一直做實時,數據基本在分鍾級內完成,Flink的都是秒級的。好像之前項目中也有一次,需要對全量數據做處理,用 Spark 一次加載所有數據上來,但是失敗了,跑不起來,所以選擇按用戶分區,一個一個計算的方式)。

之前有過一次需要計算全量數據的經歷,那次還是從備份服務器上拉取的壓縮格式的歷史數據,也有幾個T大小,用Hive也只花了2天的時間(當然這個數據是從本地解壓,直接上傳到HDFS上,不需要從HBase讀)。 

這個任務當然還有很大的優化空間,比如直接去讀HFile,或者提前把數據從HBase讀上來,放到HDFS(下次應該會這么做了),用MapReduce 也不可能花超過10小時(我覺得啊)。

說下Stream版本的任務

Stream 版本的任務開發就比較簡單了,主要用Stream 的方式獲取數據,處理代碼直接從之前的copy過來,改吧改吧。這個需求可以用Stream來做,主要是計算一個用戶的數據的時候,只需要他自己的數據,還有就是HBase的主鍵也是以用戶號+時間,所以可以一次讀取一個用戶的全部數據,處理完成后,再讀下一個人的(Spark版本基本也是這樣)。 

Stream 版本從Kafka 讀取 用戶號,分區,process 中直接去讀取 HBase 中的數據,處理完后,輸出結果。

用Kafka 做source,而不是自定義一個Source,還是Kafka 比較熟,方便好用

keyBy是為了分區,也為了分區負載均衡,方便提高並行度,直接用用戶號 hash

process 就是主要的處理邏輯了

open 中直接 創建了 所有 HTable,process 中直接用

process 中 ,構造scan 的 startRow/endRow,直接 循環所有 HTable,依次讀取當前用戶的全部數據,處理完成后輸出,再來下一個用戶。

優化的 Stream 版本,在測試環境,最快1小時就處理了全部的用戶數據(我做的版本只處理了一個類型的用戶,占總數的30%,這種方式處理,也不需要讀取全部數據,只需要30%的數據)

比較遺憾的是,Stream 版本沒有在生產環境跑過一次全量的數據,只是以比較小的並行度跑了一段時間,看了下效率。

跑了的幾百個用戶數據中,最大的一個,從HBase 讀取了 488W 條數據,數據讀取時間 52 s,處理時間:15 s

從效率上來看和Spark的差不多,都是用的HBase API 讀數據,所以讀取時間沒什么差別,處理上,過程和結果都是一樣的,只是我是自己寫的處理流程,Spark則借助了框架,開發簡單一點,但是效率略低,不過差別不大,畢竟處理時間就那么幾秒。

 

-----------------------

半夜醒來,水一篇,現在7點,回老家的我,已經吃完了早飯了,回去睡覺😪了

特意背了電腦回來,希望這兩天還有思路

祝好

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM