一、开启目标mysql的的bin-log日志
参考:
log-bin=mysql-bin
binlog-format=ROW
server_id=1
二、目标数据库-授权
参考:
create user canal identified by 'canal';
grant all privileges on *.* to 'canal'@'%';
flush privileges;
三、安装CanalAdmin
官方文档:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
配置开放服务器端口:11110、11111、11112
1、下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
2、解压
tar zxvf canal.admin-1.1.4.tar.gz
3、修改配置
vi conf/application.yml
方框内容修改:端口号,数据库地址和用户名、密码
4、初始化canal admin 数据库
进入官网拿到sql命令
5、启动canalAdmin
sh bin/startup.sh
6、访问
可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
四、安装canal-service
1、下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2、解压
tar zxvf canal.deployer.1,1,4.tar.gz
3、修改配置文档
vi bin/canal_local.properties
修改方框内:就是canalAdmin的地址+端口
4、启动canal-servic
vi bin/startup.sh local
五、验证服务
1、登录canalAdmin服务,默认密码 admin/123456
会发现第四步安装的canal-service 已经在管理列表中
六、配置instance
1、打开instance管理
2、新建instance按钮
(1)输入Instance名称(例如:example242)
就是后端部署java代码中用的名称
canal:
host: 10.100.20.242
port: 11111
username: canal
password: canal
instance: example242
3、所属主机
选择在(第四步中安装的主机)
4、载入模板
修改
canal.instance.master.address=127.0.0.1:3306 就是监控的数据库地址
canal.instance.dbUsername=canal 就是监控数据库的用户名
canal.instance.dbPassword=canal 就是监控数据库的密码
5、保存
返回Instance管理列表,状态已启动
七、启动java代码 canalClient服务
注意:有几个instance服务,对应启动几个java canalClient 否则会造成kafka发送同样的消息处理数据,增加服务器资源消耗。
1、此时java中yml文件就是对应第六步中新建的instance名称
例如:example242
canal:
host: 10.100.20.242
port: 11111
username: canal
password: canal
instance: example242
2、启动完毕后:查看canal日志
成功后,会出现:没有数据,休息一会
3、对应监控代码
@Component @Slf4j public class ReadBinLog implements ApplicationRunner { @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Value("${ETL.CANAL.NUMBER}") private int number; // @Value("${schemaName}") // private String schemaName; @Autowired CanalHandler canalHandler; /** * 获取连接 */ public CanalConnector getConn() { log.error("host:{},port:{port},instance:{},username:{},password:{}", host, port, instance, username, password); return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = this.getConn(); while (true) { try { conn.connect(); //订阅实例中所有的数据库和表 conn.subscribe(".*\\..*"); //conn.subscribe(schemaName); Message message = conn.getWithoutAck(number); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { log.info("读取条数:" + message.getEntries().size()); canalHandler.analysis(message.getEntries()); } else { try { String hostAddress = InetAddress.getLocalHost().getHostAddress(); log.info("监控源实例名称:{},没有数据,休息一会啦,canal所在服务器地址:{}", instance, hostAddress); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } // 确认消息 if (id != -1) { conn.ack(id); } } catch (CanalClientException e) { log.error(e.getMessage(), e); // 处理失败, 回滚数据 conn.rollback(); } finally { conn.disconnect(); } } } }
4、对应的ddl操作代码,本文使用的 kafka 处理数据
@Component @Slf4j public class CanalHandler { @Autowired IKafkaProducerService producerService; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_INSTALL}") String ETL_CANAL_INSTALL; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_DELETE}") String ETL_CANAL_DELETE; @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_UPDATE}") String ETL_CANAL_UPDATE; // @Value("${brandInfo}") // String brandInfo; @Autowired IDataManagerService iDataManagerService; /** * 分析数据 * * @param entries * @throws InvalidProtocolBufferException */ public void analysis(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { //如果是事务跳过 continue; } if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); //封装 CanalDB canalDB = new CanalDB(); canalDB.setTableName(entry.getHeader().getTableName()); canalDB.setTableSchema(entry.getHeader().getSchemaName()); log.info("查询SchemaName:" + canalDB.getTableSchema() + " 表名:" + canalDB.getTableName()); //查询是否在模型列表中 ResponseVO<SysSynchronizationConfigDO> responseVO = iDataManagerService.synchronizationConfig(canalDB); if (responseVO.getCode() != 0 || responseVO.getData() == null) { continue; } canalDB.setSysSynchronizationConfig(responseVO.getData()); //类型判断 if (eventType == CanalEntry.EventType.DELETE) { canalDB.setType("delete"); saveDeleteSql(entry, canalDB); } else if (eventType == CanalEntry.EventType.UPDATE) { canalDB.setType("update"); saveUpdateSql(entry, canalDB); } else if (eventType == CanalEntry.EventType.INSERT) { canalDB.setType("install"); saveInsertSql(entry, canalDB); } } } } /** * 更新语句 * * @param entry */ private void saveUpdateSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> dataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : dataList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); TreeMap<String, Object> hashMapHashMap = new TreeMap<>(); for (CanalEntry.Column column : afterColumnsList) { hashMapHashMap.put(column.getName(), column.getValue()); canalDB.setColumnAndData(hashMapHashMap); if (column.getIsKey()) { canalDB.setWhereCase(column.getName() + "=" + column.getValue()); } } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send update---->>>" + canal); producerService.sendMessage(ETL_CANAL_UPDATE, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } /** * 删除语句 * * @param entry */ private void saveDeleteSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : rowDataList) { List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : columnList) { if (column.getIsKey()) { canalDB.setWhereCase(column.getName() + "=" + column.getValue()); break; } } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send delete--->>>>" + canal); producerService.sendMessage(ETL_CANAL_DELETE, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } /** * 插入语句 * * @param entry */ private void saveInsertSql(CanalEntry.Entry entry, CanalDB canalDB) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> dataList = rowChange.getRowDatasList(); int count = 0; for (CanalEntry.RowData rowData : dataList) { List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); TreeMap<String, Object> hashMapHashMap = new TreeMap<>(); for (int i = 0; i < columnList.size(); i++) { hashMapHashMap.put(columnList.get(i).getName(), columnList.get(i).getValue()); canalDB.setColumnAndData(hashMapHashMap); } String canal = JSONObject.toJSONString(canalDB); log.info("kafka canal send install----->>" + canal); producerService.sendMessage(ETL_CANAL_INSTALL, canal, count); count++; } } catch (InvalidProtocolBufferException e) { log.error(e.getMessage(), e); } } }