接到個小需求,將mysql的部分數據增量同步到es,但是不僅僅是使用canal而已,整體的流程是mysql>>canal>>flume>>kafka>>es,說難倒也不難,只是做起來碰到的坑實在太多,特別是中間套了那么多中間件,出了故障找起來真的特別麻煩。
先來了解一下MySQL的主從備份:
從上層來看,復制分成三步:
master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
slave將master的binary log events拷貝到它的中繼日志(relay log);
slave重做中繼日志中的事件,將改變反映它自己的數據。
問題一:測試環境一切正常,但是正式環境中,這幾個字段全為0,不知道為什么
最后發現是溝通問題。。。
排查過程:
- 起初,懷疑是es的問題,會不會是string轉為long中出現了問題,PUT了個,無異常,這種情況排除。
- 再然后以為是代碼有問題,可是想了下,rowData.getAfterColumnsList().forEach(column -> data.put(column.getName(), column.getValue()))這句不可能有什么其他的問題啊,而且測試環境中一切都是好好的。
- canal安裝出錯,重新查看了一次canal.properties和instance.properties,並沒有發現配置錯了啥,如果錯了,那為什么只有那幾個字段出現異常,其他的都是好好的,郁悶。而且,用測試環境的canal配置生產中的數據庫,然后本地調試,結果依舊一樣。可能問題出在mysql。
最后發現,居然是溝通問題。。。。測試環境中是從正式環境導入的,用的insert,可是在正式環境里,用的確實insert后update字段,之后發現居然還用delete,,,,暈。。。。之前明確問過了只更新insert的,人與人之間的信任在哪里。。。。
問題二:canal.properties中四種模式的差別
簡單的說,canal維護一份增量訂閱和消費關系是依靠解析位點和消費位點的,目前提供了一下四種配置,一開始我也是懵的。
#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
local-instance
我也不知道啥。。
memory-instance
所有的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啟后又會回到初始位點進行解析
特點:速度最快,依賴最少(不需要zookeeper)
場景:一般應用在quickstart,或者是出現問題后,進行數據分析的場景,不應該將其應用於生產環境。
個人建議是調試的時候使用該模式,即新增數據的時候,客戶端能馬上捕獲到改日志,但是由於位點一直都是canal啟動的時候最新的,不適用與生產環境。
file-instance
所有的組件(parser , sink , store)都選擇了基於file持久化模式,注意,不支持HA機制.
特點:支持單機持久化
場景:生產環境,無HA需求,簡單可用.
采用該模式的時候,如果關閉了canal,會在destination中生成一個meta.dat,用來記錄關鍵信息。如果想要啟動canal之后馬上訂閱最新的位點,需要把該文件刪掉。
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.6.71","port":3306}},"postion":{"included":false,"journalName":"binlog.008335","position":221691106,"serverId":88888,"timestamp":1524294834000}}}],"destination":"example"}
default-instance
所有的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集群共享。
特點:支持HA
場景:生產環境,集群化部署.
該模式會記錄集群中所有運行的節點,主要用與HA主備模式,節點中的數據如下,可以關閉某一個canal服務來查看running的變化信息。
問題三:如果要訂閱的是mysql的從庫改怎么做?
生產環境中的主庫是不能隨便重啟的,所以訂閱的話必須訂閱mysql主從的從庫,而從庫中是默認下只將主庫的操作寫進中繼日志,並寫到自己的二進制日志的,所以需要讓其成為canal的主庫,必須讓其將日志也寫到自己的二進制日志里面。處理方法:修改/etc/my.cnf,增加一行log_slave_updates=1,重啟數據庫后就可以了。
問題四:部分字段沒有更新
最終版本是以mysql的id為es的主鍵,用canal同步到flume,再由flume到kafka,然后再由一個中間件寫到es里面去,結果發現,一天之中,會有那么一段時間得出的結果少一丟丟,甚至是驟降,如圖。不得不從頭開始排查情況,canal到flume,加了canal的重試,以及發送到flume的重試機制,沒有報錯,所有數據正常發送。flume到kafka不敢懷疑,畢竟公司一直在用,怎么可能有問題。kafka到es的中間件?組長寫的,而且一直在用,不可能==最后確認的是flume到kafka,kafka的parition處理速度不同,
check一下flume的文檔,可以知道
Property Name | Description |
---|---|
defaultPartitionId | Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId. |
大概意思是flume如果不自定義partitionIdHeader,那么消息將會被分布式kafka的partion處理,kafka本身的設置就是高吞吐量的消息系統,同一partion的消息是可以按照順序發送的,但是多個partion就不確定了,如果需要將消息按照順序發送,那么就必須要指定一個parition,即在flume的配置文件中添加:a1.channels.channel1.partitionIdHeader=1,指定parition即可。全部修改完之后,在kibana查看一下曲線:
用sql在數據庫確認了下,終於一致了,不容易。。。