Binlog實時數據采集、落地數據使用的思考總結


前文

  今天偶然刷新技術公眾號的時候,看到一篇這樣文章,是基於Flink有關於Mysql Binlog數據采集的方案,看了一下實踐方法和具體操作有一些考慮情況不足的情況,缺少一些處理實際情況的操作。筆者之前有些過一些類似的采集工具實踐的文章,但是並沒有在整體上做出一個系統性的總結,所以我在想,是不是可以做一個個人總結性的文章,把Binlog采集中的問題以及相應的解決方案也進行總結呢?
 
  可能很多人對於Binlog的認識還不是很充足,可能有些人會粗淺的認為:“它不就是mysql產生的,有固定結構的log嘛,把數據采集過來,然后把它做一下數據落地,它有什么難的呢?”
的確,它本質上確實就是個log,可是實際上,關於Binlog采集從場景分析,再到技術選型,整體內部有很多不為人知的坑,不要小瞧了它。
 
  筆者寫這篇文章,目的是把實際工作中對於Binlog數據采集的開發流程的原則、注意事項、可能存在的問題點展示出來,其中也會有筆者自己的一些個人總結數據采集中的原則,為大家作參考,都是干貨。
  所以開始吧!
 

個人總結原則

 

  首先拋開技術框架的討論,個人總結Binlog 日志的數據采集主要原則:
  •   原則一 : 與業務數據解耦
  •   原則二 : 與業務數據結構解耦
  •   原則三 : 數據是可回溯的



  分別闡述一下這三個原則的具體含義

 

  原則一

    在數據采集中,數據落地一般都會使用時間分區進行落地,那就需要我們確定一下固定的時間戳作為時間分區的基礎時間序列。
在這種情況下看來,業務數據上的時間戳字段,無論是從實際開發中獲取此時間戳的角度,還是現實表中都會存在這樣的時間戳,都不可能所有表完全滿足。
    舉一下反例:
    表 :業務時間戳
    table A : create_time,update_time
    table B : create_time
    table C : create_at
    table D : 無

 

    像這樣的情況,理論上可以通過限制 RD 和 DBA 的在設計表時規則化表結構來實現時間戳以及命名的統一化、做限制,但是是在實際工作中,這樣的情況基本上是做不到的,相信很多讀者也會遇到這樣的情況。
    可能很多做數據采集的同學會想,我們能不能要求他們去制定標准呢?
    個人的想法是,可以,但是不能把大數據底層數據采集完全依靠這樣互相制定的標准。原因有以下三點:
  • 如果只是依靠兩個部門或者多個部門制定的口頭的或者書面的標准,卻沒有強制性在coding上面做約束,全部都是人為在約束的話,后期人員增加,遲早會出問題。
  •     大數據部門與后台部門,在於數據情況變更的情況,有時候可能是信息延時的,也就是說,有可能在數據落地后發現異常后,才知道后台部門做出了調整。
  •     也是最重要的一點,大數據部門不能要求在底層數據源去要求數據源去適應大數據的采集,這樣要成的后果很有可能是限制后台部門在開發業務功能上的自由度,這樣的開發流程也是不合理的。

    所以如果想要使用唯一固定的時間序列,就要和業務的數據剝離開,我們想要的時間戳不受業務數據的變動的影響。

 

  原則二

    在業務數據庫中,一定會存在表結構變更的問題,絕大部分情況為增加列,但是也會存在列重命名、列刪除這類情況,而其中字段變更的順序是不可控的。

    此原則想描述的是,導入到數據倉庫中的表,要適應數據庫表的各種操作,保持其可用性與列數據的正確性。

 

  原則三

    此數據可回溯,其中包括兩個方面

  •     數據采集可回溯
  •     數據消費落地可回溯

 

    第一個描述的是,在采集binlog采集端,可以重新按位置采集binlog。

    第二個描述的是,在消費binlog落地的一端,可以重復消費把數據重新落地。

 

  此為筆者個人總結,無論是選擇什么樣的技術選型進行組合搭建,這幾點原則是需要具備的。


實現方案以及具體操作

  技術架構 : Debezium + Confluent + Kafka + OSS/S3 + Hive

 

  基於原則一的解決方案

  Debezium 提供了 New Record State Extraction 的配置選項,相當於提供了一個transform 算子,可以抽取出binlog 中的元數據。
對於 0.10 版本的配置,可以抽取 table,version,connector,name,ts_ms,db,server_id,file,pos,row 等binlog元數據信息。

  其中ts_ms為binlog日志的產生時間,此為binlog元數據,可以應用於所有數據表,而且可以在完全對數據表內部結構不了解的情況下,使用此固定時間戳,完全實現我們的原則一。

  關於Debezium,不同版本之前的配置參數有可能是不同的,如果讀者有需要實踐的話需要在官方文檔上確認相應版本的配置參數。
對於其他框架,例如市面上用的較多的Canal,或者讀者有自己需要開發數據采集程序的話,binlog的元數據建議全部抽取出來,在此過程以及后續過程中都可能會被用到。


  基於原則二的解決方案


  對於 Hive ,目前主流的數據存儲格式為Parquet,ORC,Json,Avro 這幾種。
  拋開數據存儲的效率討論。
  對於前兩中數據格式,為列存,也就是說,這兩種數據格式的數據讀取,會嚴格依賴於我們數據表中的數據存儲的順序,這樣的數據格式,是無法滿足數據列靈活增加、刪除等操作的。
  Avro 格式為行存,但是它需要依賴於Schema Register服務,考慮Hive的數據表讀取完全要依賴一個外部服務,風險過高。
  最后確定使用Json格式進行數據存儲,雖然這樣的讀取和存儲效率沒有其他格式高,但是這樣可以保證業務數據的任何變更都可以在hive中讀取出來。

  Debezium 組件采集binlog 的數據就是為json格式,和預期的設計方案是吻合的,可以解決原則二帶來的問題。

  對於其他框架,例如市面上用的較多的Canal,可以設置為Json數據格式進行傳輸,或者讀者有自己需要開發數據采集程序的話,也是相同的道理。


  基於原則三的解決方案


  在采集binlog采集端,可以重新按位置采集binlog。
  此方案實現方式在Debezium官方網站上也給出了相應的解決方案,大概描述一下,需要用到 Kafkacat工具。


  對於每一個采集的mysql實例,創建數據采集任務時,Confluent都會相應的創建connector(也就是采集程序)的采集的元數據的topic,
  里面會存儲相應的時間戳、文件位置、以及位置,可以通過修改此數據,重置采集binlog日志的位置。
  值得注意的是,此操作的時間節點也是有限制的,和mysql的binlog日志保存周期有關,所以此方式回溯時,需要確認的是mysql日志還存在。


  對於重復消費把數據重新落地。
  此方案因為基於kafka,對於kafka重新制定消費offset消費位點的操作網上有很多方案,此處不再贅述。
  對於讀者自己實現的話,需要確認的選擇的MQ支持此特性就好了。
 
  https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database

 

業務場景影響下的重要操作

  此部分只描述在筆者技術架構下如何實現以下操作,讀者可以根據自己選擇的技術組件探究不同的技術方案。

  數據庫分庫分表的情況


    基於Debezium的架構,一個Source 端只能對應一個mysql實例進行采集,對於同一實例上的分表情況,可以使用 Debezium Topic Routing 功能,
    在采集過濾binlog時把相應需要采集的表按照正則匹配寫入一個指定的topic中。
    在分庫的情況下,還需要在 sink 端 增加 RegexRouter transform算子進行topic 間的合並寫入操作。



  數據增量采集與全量采集

    對於采集組件,目前目前的配置都是以增量為默認,所以無論是選擇 Debezium 還是 Canal的話,正常配置就好。
    但是有些時候會存在需要采集全表的情況,筆者也給出一下全量的數據采集的方案。
    方案一
      Debezium 本身自帶了這樣的功能,需要將
      snapshot.mode 參數選型設置為 when_needed,這樣可以做表的全量采集操作。
      官方文檔中,在此處的參數配置有更加細致的描述。

  https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots

    方案二
      使用sqoop和增量采集同時使用的方式進行。
      此方案適用於表數據已存在很多,而目前binlog數據頻率不頻繁的情況下,使用此方案。
      值得注意的是有兩點:
  •       sqoop數據導入落地為Parquet格式,與增量采集數據合並時,需要做數據格式整合,也就是中間需要有臨時表,通過union all的方式把數據merge到全量表中。
  •       sqoop導入的Parquet格式,與 Debezium 處理某些數據類型時會存在不相同的問題,例如datetime類型,sqoop會導出string,Debezium 會轉化為bigint。


  離線數據去重條件

    數據落地后,通過json表映射出binlog原始數據,那么問題也就來了,我們如何找到最新的一條數據呢?
    也許我們可以簡單的認為,用我們剛剛的抽取的那個ts_ms,然后做倒排不就好了嗎?
    大部分情況下這樣做確實是可以的。
    但是筆者在實際開發中,發現這樣的情況是不能滿足所有情況的,因為在binlog中,可能真的會存在 ts_ms 與 PK 相同,但是確實不同的兩條數據。
    那我們怎么去解決時間都相同的兩條數據呢?
    答案就在上文,我們剛剛建議的把binlog 的元數據都抽取出來。

SELECT *
FROM
(
SELECT *,
row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by
FROM test t
WHERE dt='{pt}'
AND hour='{now_hour}'
) t1
WHERE t1.order_by = 1

 


    解釋一下這個sql 中row_number的的條件
    __ts_ms : 為binlog中的ts_ms,也就是事件時間。
    __file : 為binlog此條數據所在file name。
    __pos : 為binlog中此數據所在文件中的位置,為數據類型。

    這樣的條件組合取出的數據,就是最新的一條。

    也許有讀者會問,如果這條數據被刪除了怎么辦,你這樣取出來的數據不就是錯的了嗎?
    這個Debezium也有相應的操作,有相應的配置選項讓你如何選擇處理刪除行為的binlog數據。
    作為給大家的參考,筆者選擇 rewrite 的參數配置,這樣在上面的sql最外層只需要判斷 “delete = ’false‘“ 就是正確的數據啦。

    https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html


架構上的總結


  在技術選型以及整體與細節的架構中,筆者始終在堅持一個原則——
  流程盡量簡約而不簡單,數據環節越長,出問題的環節就可能越多。對於后期鎖定問題與運維難度也會很高。

  所以筆者在技術選型也曾考慮過Flink + Kafka 的這種方式,但是基於當時的現狀,筆者並沒有選擇這樣的技術選型,筆者也闡述一下原因。
  • 筆者的flink環境沒有做開發平台化與運維平台化。
  • 場景偏向於數據采集和傳輸,而不是計算,Flink的優勢特性並沒有使用到很多。
  • 如果基於一個Mysql 實例開發一個Flink程序,使用原生的Flink steaming,做api式的程序開發,如果因為某些表的數據導致程序掛掉,這個實例的數據都無法采集了,這樣的影響范圍太大。
  • 如果基於一個一個表或者通過正則的方式匹配一些表,做一個flink程序,這樣雖然是保證了靈活度,但是90%的代碼都是冗余的,而且會有很多任務,浪費資源。
  • 最后就是開發和維護效率的問題,如果只是寫原生的 flink 程序的話,后續的累加開發,會把程序變得越來越重,可能邏輯也會越來越繁瑣。


  總結起來,我當時對於Flink的思考,如果Flink沒有做開發和運維監控的平台化的情況下,可以作為一個臨時方案,但是后期如果一直在這樣一個開發流程下縫縫補補,多人開發下很容易出現問題,或者就是大家都這樣一個程序框架下造輪子,而且越造越慢。而且后期的主要項目方向並沒有把Flink平台化提上日程,所以也是考慮了一部分未來的情況進行的選擇。
  所以個人最后確定技術選型的時候,並沒有選用Flink。

 

結束語

  此篇文章筆者寫的較為理論化,也是對此場景的一個技術理論總結。如果文中有其他不明確的操作的話,可以參考筆者之前的文章,有詳細的代碼級操作。

  技術架構上的方案多種多樣,筆者只是選擇了其中一種進行實現,也希望大家有其他的技術方案或者理論進行交流,煩請指正。

  微信:Franki__5

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM