一.Flume收集各數據庫日志,准實時抽取到HDFS
安裝HDP,包含Flume
方案優點:
1.配置簡單,不用編程:只要在flume.conf文件中配置source、channel及sink的相關屬性
2.采用普通SQL輪詢的方式實現,具有通用性,適用於所有關系庫數據源
方案缺點:
1.在源庫上執行了查詢,具有入侵性
2.通過輪詢的方式實現增量,只能做到准實時,而且輪詢間隔越短,對源庫的影響越大
3.只能識別新增數據,檢測不到刪除與更新
4.要求源庫必須有用於表示增量的字段
二.canal
原理:
- canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送(slave拉取,不是master主動push給slaves)binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
mysql中需要配置一個用戶,專門提供給canal用
canal開源代碼中發送端僅僅支持mysql,不支持oracle,接收端由於采用jdbc,mysql、oracle等可以通吃。
三.maxwell
優點:
- 支持bootstrap啟動,同步歷史數據
- 集成kafka,直接將數據落地到kafka
- 已將binlog中的DML和DDL進行了模式匹配,將其解碼為有schema的json(有利於后期將其重組為nosql支持的語言)
{“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}
缺點:
- 一個MySQL實例需要對應一個maxwell進程
- bootstrap的方案使用的是
select *
maxwell的配置文件只有一個config.properties,在home目錄。其中除了需要配置mysql master的地址、kafka地址還需要配置一個用於存放maxwell相關信息的mysql地址,maxwell會把讀取binlog關系的信息,如binlog name、position。
工具對比
方案對比
- 方案1使用阿里開源的Canal進行Mysql binlog數據的抽取,另需開發一個數據轉換工具將從binlog中解析出的數據轉換成自帶schema的json數據並寫入kafka中。而方案2使用maxwell可直接完成對mysql binlog數據的抽取和轉換成自帶schema的json數據寫入到kafka中。
- 方案1中不支持表中已存在的歷史數據進行同步,此功能需要開發(如果使用sqoop進行歷史數據同步,不夠靈活,會使結果表與原始表結構相同,有區別於數據交換平台所需的schema)。方案2提供同步歷史數據的解決方案。
- 方案1支持HA部署,而方案2不支持HA
方案1和方案2的區別只在於kafka之前,當數據緩存到kafka之后,需要一個定制的數據路由組件來將自帶schema的數據解析到目標存儲中。
數據路由組件主要負責將kafka中的數據實時讀出,寫入到目標存儲中。(如將所有日志數據保存到HDFS中,也可以將數據落地到所有支持jdbc的數據庫,落地到HBase,Elasticsearch等。)
maxwell:
MySQL->Maxwell->Kafka->Flume->HDFS
寫入HDFS的數據時json的,可能還需要提取只需要的數據,另外,對於update或delete操作目前還不知道要怎么處理。生產使用難度很大。
把增量的Log作為一切系統的基礎。后續的數據使用方,通過訂閱kafka來消費log。
比如:
- 大數據的使用方可以將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
- 提供搜索服務的使用方可以保存到Elasticsearch或HBase 中;
- 提供緩存服務的使用方可以將日志緩存到Redis或alluxio中;
- 數據同步的使用方可以將數據保存到自己的數據庫中;
- 由於kafka的日志是可以重復消費的,並且緩存一段時間,各個使用方可以通過消費kafka的日志來達到既能保持與數據庫的一致性,也能保證實時性;
為什么使用log和kafka作為基礎,而不使用Sqoop進行抽取呢? 因為:
DWS平台, DWS平台是有3個子項目組成:
- Dbus(數據總線):負責實時將數據從源端實時抽出,並轉換為約定的自帶schema的json格式數據(UMS 數據),放入kafka中;
- Wormhole(數據交換平台):負責從kafka讀出數據 將數據寫入到目標中;
- Swifts(實時計算平台):負責從kafka中讀出數據,實時計算,並將數據寫回kafka中。
圖中:
- Log extractor和dbus共同完成數據抽取和數據轉換,抽取包括全量和增量抽取。
- Wormhole可以將所有日志數據保存到HDFS中; 還可以將數據落地到所有支持jdbc的數據庫,落地到HBash,Elasticsearch,Cassandra等;
- Swifts支持以配置和SQL的方式實現對進行流式計算,包括支持流式join,look up,filter,window aggregation等功能;
- Dbus web是dbus的配置管理端,rider除了配置管理以外,還包括對Wormhole和Swifts運行時管理,數據質量校驗等。
對於增量的log,通過訂閱Canal Server的方式,我們得到了MySQL的增量日志:
- 按照Canal的輸出,日志是protobuf格式,開發增量Storm程序,將數據實時轉換為我們定義的UMS格式(json格式,稍后我會介紹),並保存到kafka中;
- 增量Storm程序還負責捕獲schema變化,以控制版本號;
- 增量Storm的配置信息保存在Zookeeper中,以滿足高可用需求。
- Kafka既作為輸出結果也作為處理過程中的緩沖器和消息解構區。
- 在考慮使用Storm作為解決方案的時候,我們主要是認為Storm有以下優點:
- 技術相對成熟,比較穩定,與kafka搭配也算標准組合;
- 實時性比較高,能夠滿足實時性需求;
- 滿足高可用需求;
- 通過配置Storm並發度,可以活動性能擴展的能力;
全量抽取
對於流水表,有增量部分就夠了,但是許多表需要知道最初(已存在)的信息。這時候我們需要initial load(第一次加載)。
對於initial load(第一次加載),同樣開發了全量抽取Storm程序通過jdbc連接的方式,從源端數據庫的備庫進行拉取。initial load是拉全部數據,所以我們推薦在業務低峰期進行。好在只做一次,不需要每天都做。
全量抽取,我們借鑒了Sqoop的思想。將全量抽取Storm分為了2 個部分:
- 數據分片
- 實際抽取
數據分片需要考慮分片列,按照配置和自動選擇列將數據按照范圍來分片,並將分片信息保存到kafka中。
下面是具體的分片策略:
全量抽取的Storm程序是讀取kafka的分片信息,采用多個並發度並行連接數據庫備庫進行拉取。因為抽取的時間可能很長。抽取過程中將實時狀態寫到Zookeeper中,便於心跳程序監控。