一、開啟目標mysql的的bin-log日志
參考:
log-bin=mysql-bin
binlog-format=ROW
server_id=1
二、目標數據庫-授權
參考:
create user canal identified by 'canal';
grant all privileges on *.* to 'canal'@'%';
flush privileges;
三、安裝CanalAdmin
官方文檔:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
配置開放服務器端口:11110、11111、11112
1、下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
2、解壓
tar zxvf canal.admin-1.1.4.tar.gz
3、修改配置
vi conf/application.yml
方框內容修改:端口號,數據庫地址和用戶名、密碼
4、初始化canal admin 數據庫
進入官網拿到sql命令
5、啟動canalAdmin
sh bin/startup.sh
6、訪問
可以通過 http://127.0.0.1:8089/ 訪問,默認密碼:admin/123456
四、安裝canal-service
1、下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2、解壓
tar zxvf canal.deployer.1,1,4.tar.gz
3、修改配置文檔
vi bin/canal_local.properties
修改方框內:就是canalAdmin的地址+端口
4、啟動canal-servic
vi bin/startup.sh local
五、驗證服務
1、登錄canalAdmin服務,默認密碼 admin/123456
會發現第四步安裝的canal-service 已經在管理列表中
六、配置instance
1、打開instance管理
2、新建instance按鈕
(1)輸入Instance名稱(例如:example242)
就是后端部署java代碼中用的名稱
canal:
host: 10.100.20.242
port: 11111
username: canal
password: canal
instance: example242
3、所屬主機
選擇在(第四步中安裝的主機)
4、載入模板
修改
canal.instance.master.address=127.0.0.1:3306 就是監控的數據庫地址
canal.instance.dbUsername=canal 就是監控數據庫的用戶名
canal.instance.dbPassword=canal 就是監控數據庫的密碼
5、保存
返回Instance管理列表,狀態已啟動
七、啟動java代碼 canalClient服務
注意:有幾個instance服務,對應啟動幾個java canalClient 否則會造成kafka發送同樣的消息處理數據,增加服務器資源消耗。
1、此時java中yml文件就是對應第六步中新建的instance名稱
例如:example242
canal:
host: 10.100.20.242
port: 11111
username: canal
password: canal
instance: example242
2、啟動完畢后:查看canal日志
成功后,會出現:沒有數據,休息一會
3、對應監控代碼
@Component @Slf4j public class ReadBinLog implements ApplicationRunner { @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Value("${ETL.CANAL.NUMBER}") private int number; // @Value("${schemaName}") // private String schemaName; @Autowired CanalHandler canalHandler; /** * 獲取連接 */ public CanalConnector getConn() { log.error("host:{},port:{port},instance:{},username:{},password:{}", host, port, instance, username, password); return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = this.getConn(); while (true) { try { conn.connect(); //訂閱實例中所有的數據庫和表 conn.subscribe(".*\\..*"); //conn.subscribe(schemaName); Message message = conn.getWithoutAck(number); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { log.info("讀取條數:" + message.getEntries().size()); canalHandler.analysis(message.getEntries()); } else { try { String hostAddress = InetAddress.getLocalHost().getHostAddress(); log.info("監控源實例名稱:{},沒有數據,休息一會啦,canal所在服務器地址:{}", instance, hostAddress); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } // 確認消息 if (id != -1) { conn.ack(id); } } catch (CanalClientException e) { log.error(e.getMessage(), e); // 處理失敗, 回滾數據 conn.rollback(); } finally { conn.disconnect(); } } } }
4、對應的ddl操作代碼,本文使用的 kafka 處理數據
@Component @Slf4j public class CanalHandler { @Autowired IKafkaProducerService producerService; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_INSTALL}") String ETL_CANAL_INSTALL; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_DELETE}") String ETL_CANAL_DELETE; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_UPDATE}") String ETL_CANAL_UPDATE; // @Value("${brandInfo}") // String brandInfo; @Autowired IDataManagerService iDataManagerService; /** * 分析數據 * * @param entries * @throws InvalidProtocolBufferException */ public void analysis(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { //如果是事務跳過 continue; } if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); //封裝 CanalDB canalDB = new CanalDB(); canalDB.setTableName(entry.getHeader().getTableName()); canalDB.setTableSchema(entry.getHeader().getSchemaName()); log.info("查詢SchemaName:" + canalDB.getTableSchema() + " 表名:" + canalDB.getTableName()); //查詢是否在模型列表中 ResponseVO<SysSynchronizationConfigDO> responseVO = iDataManagerService.synchronizationConfig(canalDB); if (responseVO.getCode() != 0 || responseVO.getData() == null) { continue; } canalDB.setSysSynchronizationConfig(responseVO.getData()); //類型判斷 if (eventType == CanalEntry.EventType.DELETE) { canalDB.setType("delete"); saveDeleteSql(entry, canalDB); } else if (eventType == CanalEntry.EventType.UPDATE) { canalDB.setType("update"); saveUpdateSql(entry, canalDB); } else if (eventType == CanalEntry.EventType.INSERT) { canalDB.setType("install"); saveInsertSql(entry, canalDB); } } } } /** * 更新語句 * * @param entry */ private void saveUpdateSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> dataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : dataList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); TreeMap<String, Object> hashMapHashMap = new TreeMap<>(); for (CanalEntry.Column column : afterColumnsList) { hashMapHashMap.put(column.getName(), column.getValue()); canalDB.setColumnAndData(hashMapHashMap); if (column.getIsKey()) { canalDB.setWhereCase(column.getName() + "=" + column.getValue()); } } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send update---->>>" + canal); producerService.sendMessage(ETL_CANAL_UPDATE, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } /** * 刪除語句 * * @param entry */ private void saveDeleteSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : rowDataList) { List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : columnList) { if (column.getIsKey()) { canalDB.setWhereCase(column.getName() + "=" + column.getValue()); break; } } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send delete--->>>>" + canal); producerService.sendMessage(ETL_CANAL_DELETE, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } /** * 插入語句 * * @param entry */ private void saveInsertSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> dataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : dataList) { List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); TreeMap<String, Object> hashMapHashMap = new TreeMap<>(); for (int i = 0; i < columnList.size(); i++) { hashMapHashMap.put(columnList.get(i).getName(), columnList.get(i).getValue()); canalDB.setColumnAndData(hashMapHashMap); } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send install----->>" + canal); producerService.sendMessage(ETL_CANAL_INSTALL, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } }