開發手冊
https://github.com/alibaba/canal
mysql配置
1.開啟binlog
找到mysql安裝目錄 mysql --help|grep my.cnf 編輯my.cf
[mysqld] # log_bin log-bin = mysql-bin #開啟binlog binlog-format = ROW #選擇row模式 server_id = 1 #配置mysql replication需要定義,不能喝canal的slaveId重復
2.查看是否開啟相關命令
--是否開啟了binlog on為開啟 show variables like 'log_bin' --查看當前日志文件信息 show master status; --關閉當前日志文件 創建一個新的日志文件 原有日志文件編號+1 flush logs; --查看所有日志文件events show binlog events limit 0,1 --查看指定文件events show binlog events in 'mysql-bin.003243'; --查看文件列表 show binary logs; --查看binlog格式 ROW show variables like 'binlog_format'
canal目錄介紹
/bin 為啟動腳本
/conf/canal_local.propreteis 為canal admin集群時的配置通過./start.sh 使用
/conf/example 為canal監控的數據庫實例的配置 客戶端通過指定Destination 可以消費具體一個 比如order服務就消費order數據庫實例 product則消費product數據庫實例 但是canal-server是一個 當然對應的目錄則不是example而是需要創建order和produc目錄 同時在canal.propreteis 指定canal.destinations = order,product
/h2.mv.db 為使用h2數據庫時使用 /meta.dat 存儲了當前讀取的binlog文件 和指針位置
/instace.properteis 則配置的監控的數據庫實例的binlog日志
安裝CanalServer
下載
https://github.com/alibaba/canal/releases/tag/canal-1.1.4
或者根據下載源碼執行maven安裝
這種方式推薦,因為有時需要修改源碼完成定制化需求
mvn clean install -Dmaven.test.skip -Denv=release
1.解壓
2.修改配置文件
canal.deployer-1.1.4/conf/example/instance.properties 配置文件
3.啟動
4.查看是否啟動成功
canal.deployer-1.1.4/logs/canal
Cannal客戶端
1.創建一個demo項目
2.引入pom依賴
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
3.demo代碼
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; /** * @author liqiang * @date 2020/1/13 16:21 * @Description: */ @Component public class CanalStart implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { System.out.println("開始消費...."); // 創建鏈接 canal的鏈接和地址 port默認是11111 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.3.17.72", 11111), "example", "", "");//或者example2 int batchSize = 1000;//每個批次處理1000條 int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*");//訂閱所有庫下面的所有表 //connector.subscribe("canal.t_canal");//訂閱庫canal庫下的表t_canal connector.rollback(); int totalEmtryCount = 1200; while (emptyCount < totalEmtryCount) {//實際生產中需要設置為true,死循環 Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; // System.out.println("empty count : " + emptyCount);//代表沒有需要消費的數據 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
測試場景
簡單查詢
select * from `user`;
並不會被消費到
執行刪除
整表刪除
message[batchId=7,size=3] rowChare ======>tableId: 33 eventType: DELETE isDdl: false rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "liqiang" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } } rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "3" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "Tom" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "28" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test3@baomidou.com" mysqlType: "varchar(50)" } } rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "4" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "Sandy" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "21" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test4@baomidou.com" mysqlType: "varchar(50)" } } rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "5" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "Billie" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "24" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test5@baomidou.com" mysqlType: "varchar(50)" } } ================> binlog[mysql-bin.000001:3000] , name[haoke,user] , eventType : DELETE #表示是刪除事件 id : 2 update=false name : liqiang update=false age : 20 update=false email : test2@baomidou.com update=false id : 3 update=false name : Tom update=false age : 28 update=false email : test3@baomidou.com update=false id : 4 update=false name : Sandy update=false age : 21 update=false email : test4@baomidou.com update=false id : 5 update=false name : Billie update=false age : 24 update=false email : test5@baomidou.com update=false
可以發現會消費5個事件
根據條件刪除
delete from`user` where id=2;
打印
message[batchId=10,size=3] rowChare ======>tableId: 116 eventType: DELETE isDdl: false rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "liqiang" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } } ================> binlog[mysql-bin.000001:3551] , name[haoke,user] , eventType : DELETE id : 2 update=false name : liqiang update=false age : 20 update=false email : test2@baomidou.com update=false
修改
update `user` u set u.`name`='小明' where id=2
message[batchId=13,size=3] rowChare ======>tableId: 117 eventType: UPDATE isDdl: false rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "liqiang" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } afterColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } afterColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "\345\260\217\346\230\216" mysqlType: "varchar(30)" } afterColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } afterColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } } ================> binlog[mysql-bin.000001:3989] , name[haoke,user] , eventType : UPDATE -------> before id : 2 update=false name : liqiang update=false age : 20 update=false email : test2@baomidou.com update=false -------> after id : 2 update=false name : 小明 update=true age : 20 update=false email : test2@baomidou.com update=false
可以發現打印了修改前修改后數據 以及哪個字段被修改
新增
INSERT INTO `haoke`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '小明2', 20, 'test6@baomidou.com');
message[batchId=14,size=3] rowChare ======>tableId: 117 eventType: INSERT isDdl: false rowDatas { afterColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: true isNull: false value: "6" mysqlType: "bigint(20)" } afterColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "\345\260\217\346\230\2162" mysqlType: "varchar(30)" } afterColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: true isNull: false value: "20" mysqlType: "int(11)" } afterColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: true isNull: false value: "test6@baomidou.com" mysqlType: "varchar(50)" } } ================> binlog[mysql-bin.000001:4245] , name[haoke,user] , eventType : INSERT id : 6 update=true name : 小明2 update=true age : 20 update=true email : test6@baomidou.com update=true
事件是新增
測試事物
未提交
mysql> START TRANSACTION; Query OK, 0 rows affected (0.00 sec) mysql> update `user` s set s.name='小張' where id=2; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql>
並沒有消費
提交
mysql> START TRANSACTION; Query OK, 0 rows affected (0.00 sec) mysql> update `user` s set s.name='小張' where id=2; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> commit; Query OK, 0 rows affected (0.01 sec)
打印:
message[batchId=15,size=3] rowChare ======>tableId: 117 eventType: UPDATE isDdl: false rowDatas { beforeColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } beforeColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: false isNull: false value: "\345\260\217\346\230\216" mysqlType: "varchar(30)" } beforeColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } beforeColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } afterColumns { index: 0 sqlType: -5 name: "id" isKey: true updated: false isNull: false value: "2" mysqlType: "bigint(20)" } afterColumns { index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "\345\260\217\345\274\240" mysqlType: "varchar(30)" } afterColumns { index: 2 sqlType: 4 name: "age" isKey: false updated: false isNull: false value: "20" mysqlType: "int(11)" } afterColumns { index: 3 sqlType: 12 name: "email" isKey: false updated: false isNull: false value: "test2@baomidou.com" mysqlType: "varchar(50)" } } ================> binlog[mysql-bin.000001:4461] , name[haoke,user] , eventType : UPDATE -------> before id : 2 update=false name : 小明 update=false age : 20 update=false email : test2@baomidou.com update=false -------> after id : 2 update=false name : 小張 update=true age : 20 update=false email : test2@baomidou.com update=false
測試回滾
mysql> START TRANSACTION; Query OK, 0 rows affected (0.00 sec) mysql> update `user` s set s.name='小張2' where id=2; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> rollback; Query OK, 0 rows affected (0.00 sec)
並沒有消費
Adapter
可以參考一下源碼實現client
http://www.mamicode.com/info-detail-2851627.html