通過canal實現把MySQL數據實時增量到kafka


說明:我們有一個業務需要把mysql中一些表實時同步到大數據集群hbase上面,我們先通過sqoop把表中數據全量導入到hbase中,然后再通過canal定位的某個binlog的position,來實現增量同步,canal官網提供了java/go接口,直接寫入到Kafka,然后通過sparkstreaming實時寫入到hbase中

一. 通過sqoop把mysql表中的數據全量導入到hbase中(需要安裝sqoop)

sqoop import \
--connect jdbc:mysql://ip:port/database \
--username username \
--password password \
--table user_info \
--hbase-create-table \
--hbase-table user_info \
--hbase-row-key id \
--column-family order_info

 

二. 精確定位到binlog位點,進行啟動

1. 查看當前數據庫的binlog日志,在數據庫中通過show binary logs 查看

mysql> show binary logs;
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.001112 |    375374 |
| mysql-bin.001113 |    366569 |
| mysql-bin.001114 |    360112 |
| mysql-bin.001115 |    101198 

 

2. 查看當前binlog日志的position(一般最大的binlog以及最大的position)

show binlog events in 'mysql-bin.001115';

 

3. 需要先重啟canal服務(如果之前正運行canal)

cd /usr/local/canal/bin && ./stop.sh && ./startup.sh

 

4. 修改zookeeper對應destination里面的配置

說明:我這邊是在配置文件里面配置了zookeeper,如果沒有配置,則會在你相應的destination目錄下面生成meta.dat文件,也只需修改到對應的binlog和position即可

1)連接zookeeper

./zkCli.sh -server zk-address:2181

2)查看對應destination的配置(其中test為destination的名稱)

(CONNECTED) 2] get /otter/canal/destinations/test/1001/cursor
"journalName":"mysqlbin.002908"
"position":198601951

3)把上面配置中journalName和position修改為自己需要的binlog日志和偏移量

 (CONNECTED) 3]set /otter/canal/destinations/d_aura_jike/1001/cursor {xxx}

 

5. 修改一下對應destination的配置文件(主要是觸發使其生效)

vim /usr/local/canal/conf/test/instance.properties

 

6. 通過canal提供的java/go接口,來測試數據的同步(官網有例子https://github.com/alibaba/canal)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM