安装canal admin同步数据


一、开启目标mysql的的bin-log日志

 参考:

log-bin=mysql-bin

binlog-format=ROW

server_id=1

 

二、目标数据库-授权

参考:

create user canal identified by 'canal';

grant all privileges on *.* to 'canal'@'%';

flush privileges;

 

三、安装CanalAdmin

官方文档:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

配置开放服务器端口:11110、11111、11112

 

1、下载

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

2、解压

   tar zxvf canal.admin-1.1.4.tar.gz

3、修改配置

   vi conf/application.yml

 

 

 

 

 

方框内容修改:端口号,数据库地址和用户名、密码

 

4、初始化canal admin 数据库

 进入官网拿到sql命令

5、启动canalAdmin

sh bin/startup.sh

6、访问

   可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

四、安装canal-service

1、下载

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2、解压

   tar zxvf canal.deployer.1,1,4.tar.gz

3、修改配置文档

   vi bin/canal_local.properties

 

修改方框内:就是canalAdmin的地址+端口

4、启动canal-servic

   vi bin/startup.sh local

五、验证服务

1、登录canalAdmin服务,默认密码  admin/123456

 

 

会发现第四步安装的canal-service 已经在管理列表中

 

六、配置instance

1、打开instance管理

2、新建instance按钮

 

(1)输入Instance名称(例如:example242)  

就是后端部署java代码中用的名称

canal:
  host: 10.100.20.242
  port: 11111
  username: canal
  password: canal
  instance: example242

3、所属主机

选择在(第四步中安装的主机)

4、载入模板

修改

canal.instance.master.address=127.0.0.1:3306    就是监控的数据库地址

canal.instance.dbUsername=canal             就是监控数据库的用户名

canal.instance.dbPassword=canal              就是监控数据库的密码

 

5、保存

返回Instance管理列表,状态已启动

七、启动java代码  canalClient服务

    注意:有几个instance服务,对应启动几个java canalClient 否则会造成kafka发送同样的消息处理数据,增加服务器资源消耗。

 

1、此时java中yml文件就是对应第六步中新建的instance名称

例如:example242

canal:
  host: 10.100.20.242
  port: 11111
  username: canal
  password: canal
  instance: example242

 

2、启动完毕后:查看canal日志

成功后,会出现:没有数据,休息一会

 

3、对应监控代码

@Component
@Slf4j
public class ReadBinLog implements ApplicationRunner {

    @Value("${canal.host}")
    private String host;

    @Value("${canal.port}")
    private int port;

    @Value("${canal.username}")
    private String username;

    @Value("${canal.password}")
    private String password;

    @Value("${canal.instance}")
    private String instance;

    @Value("${ETL.CANAL.NUMBER}")
    private int number;

//    @Value("${schemaName}")
//    private String schemaName;

    @Autowired
    CanalHandler canalHandler;

    /**
     * 获取连接
     */
    public CanalConnector getConn() {
        log.error("host:{},port:{port},instance:{},username:{},password:{}", host, port, instance, username, password);
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        CanalConnector conn = this.getConn();
        while (true) {
            try {
                conn.connect();
                //订阅实例中所有的数据库和表
                conn.subscribe(".*\\..*");
                //conn.subscribe(schemaName);
                Message message = conn.getWithoutAck(number);

                long id = message.getId();
                int size = message.getEntries().size();
                if (id != -1 && size > 0) {
                    log.info("读取条数:" + message.getEntries().size());
                    canalHandler.analysis(message.getEntries());
                } else {
                    try {
                        String hostAddress = InetAddress.getLocalHost().getHostAddress();
                        log.info("监控源实例名称:{},没有数据,休息一会啦,canal所在服务器地址:{}", instance, hostAddress);
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 确认消息
                if (id != -1) {
                    conn.ack(id);
                }
            } catch (CanalClientException e) {
                log.error(e.getMessage(), e);
                // 处理失败, 回滚数据
                conn.rollback();
            } finally {
                conn.disconnect();
            }
        }
    }
}

4、对应的ddl操作代码,本文使用的  kafka  处理数据

@Component
@Slf4j
public class CanalHandler {

    @Autowired
    IKafkaProducerService producerService;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_INSTALL}")
    String ETL_CANAL_INSTALL;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_DELETE}")
    String ETL_CANAL_DELETE;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_UPDATE}")
    String ETL_CANAL_UPDATE;

//    @Value("${brandInfo}")
//    String brandInfo;

    @Autowired
    IDataManagerService iDataManagerService;

    /**
     * 分析数据
     *
     * @param entries
     * @throws InvalidProtocolBufferException
     */
    public void analysis(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //如果是事务跳过
                continue;
            }
            if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                //封装
                CanalDB canalDB = new CanalDB();
                canalDB.setTableName(entry.getHeader().getTableName());
                canalDB.setTableSchema(entry.getHeader().getSchemaName());
                log.info("查询SchemaName:" + canalDB.getTableSchema() + " 表名:" + canalDB.getTableName());
                //查询是否在模型列表中
                ResponseVO<SysSynchronizationConfigDO> responseVO = iDataManagerService.synchronizationConfig(canalDB);
                if (responseVO.getCode() != 0 || responseVO.getData() == null) {
                    continue;
                }
                canalDB.setSysSynchronizationConfig(responseVO.getData());
                //类型判断
                if (eventType == CanalEntry.EventType.DELETE) {
                    canalDB.setType("delete");
                    saveDeleteSql(entry, canalDB);
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    canalDB.setType("update");
                    saveUpdateSql(entry, canalDB);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    canalDB.setType("install");
                    saveInsertSql(entry, canalDB);
                }
            }
        }
    }

    /**
     * 更新语句
     *
     * @param entry
     */
    private void saveUpdateSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> dataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : dataList) {
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                TreeMap<String, Object> hashMapHashMap = new TreeMap<>();
                for (CanalEntry.Column column : afterColumnsList) {
                    hashMapHashMap.put(column.getName(), column.getValue());
                    canalDB.setColumnAndData(hashMapHashMap);
                    if (column.getIsKey()) {
                        canalDB.setWhereCase(column.getName() + "=" + column.getValue());
                    }
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send update---->>>" + canal);
                producerService.sendMessage(ETL_CANAL_UPDATE, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 删除语句
     *
     * @param entry
     */
    private void saveDeleteSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : rowDataList) {
                List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
                for (CanalEntry.Column column : columnList) {
                    if (column.getIsKey()) {
                        canalDB.setWhereCase(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send delete--->>>>" + canal);
                producerService.sendMessage(ETL_CANAL_DELETE, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 插入语句
     *
     * @param entry
     */
    private void saveInsertSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> dataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : dataList) {
                List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                TreeMap<String, Object> hashMapHashMap = new TreeMap<>();
                for (int i = 0; i < columnList.size(); i++) {
                    hashMapHashMap.put(columnList.get(i).getName(), columnList.get(i).getValue());
                    canalDB.setColumnAndData(hashMapHashMap);
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send install----->>" + canal);
                producerService.sendMessage(ETL_CANAL_INSTALL, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }
}

 

 

 

 

完毕!!!


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM