安裝canal admin同步數據


一、開啟目標mysql的的bin-log日志

 參考:

log-bin=mysql-bin

binlog-format=ROW

server_id=1

 

二、目標數據庫-授權

參考:

create user canal identified by 'canal';

grant all privileges on *.* to 'canal'@'%';

flush privileges;

 

三、安裝CanalAdmin

官方文檔:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

配置開放服務器端口:11110、11111、11112

 

1、下載

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

2、解壓

   tar zxvf canal.admin-1.1.4.tar.gz

3、修改配置

   vi conf/application.yml

 

 

 

 

 

方框內容修改:端口號,數據庫地址和用戶名、密碼

 

4、初始化canal admin 數據庫

 進入官網拿到sql命令

5、啟動canalAdmin

sh bin/startup.sh

6、訪問

   可以通過 http://127.0.0.1:8089/ 訪問,默認密碼:admin/123456

四、安裝canal-service

1、下載

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2、解壓

   tar zxvf canal.deployer.1,1,4.tar.gz

3、修改配置文檔

   vi bin/canal_local.properties

 

修改方框內:就是canalAdmin的地址+端口

4、啟動canal-servic

   vi bin/startup.sh local

五、驗證服務

1、登錄canalAdmin服務,默認密碼  admin/123456

 

 

會發現第四步安裝的canal-service 已經在管理列表中

 

六、配置instance

1、打開instance管理

2、新建instance按鈕

 

(1)輸入Instance名稱(例如:example242)  

就是后端部署java代碼中用的名稱

canal:
  host: 10.100.20.242
  port: 11111
  username: canal
  password: canal
  instance: example242

3、所屬主機

選擇在(第四步中安裝的主機)

4、載入模板

修改

canal.instance.master.address=127.0.0.1:3306    就是監控的數據庫地址

canal.instance.dbUsername=canal             就是監控數據庫的用戶名

canal.instance.dbPassword=canal              就是監控數據庫的密碼

 

5、保存

返回Instance管理列表,狀態已啟動

七、啟動java代碼  canalClient服務

    注意:有幾個instance服務,對應啟動幾個java canalClient 否則會造成kafka發送同樣的消息處理數據,增加服務器資源消耗。

 

1、此時java中yml文件就是對應第六步中新建的instance名稱

例如:example242

canal:
  host: 10.100.20.242
  port: 11111
  username: canal
  password: canal
  instance: example242

 

2、啟動完畢后:查看canal日志

成功后,會出現:沒有數據,休息一會

 

3、對應監控代碼

@Component
@Slf4j
public class ReadBinLog implements ApplicationRunner {

    @Value("${canal.host}")
    private String host;

    @Value("${canal.port}")
    private int port;

    @Value("${canal.username}")
    private String username;

    @Value("${canal.password}")
    private String password;

    @Value("${canal.instance}")
    private String instance;

    @Value("${ETL.CANAL.NUMBER}")
    private int number;

//    @Value("${schemaName}")
//    private String schemaName;

    @Autowired
    CanalHandler canalHandler;

    /**
     * 獲取連接
     */
    public CanalConnector getConn() {
        log.error("host:{},port:{port},instance:{},username:{},password:{}", host, port, instance, username, password);
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        CanalConnector conn = this.getConn();
        while (true) {
            try {
                conn.connect();
                //訂閱實例中所有的數據庫和表
                conn.subscribe(".*\\..*");
                //conn.subscribe(schemaName);
                Message message = conn.getWithoutAck(number);

                long id = message.getId();
                int size = message.getEntries().size();
                if (id != -1 && size > 0) {
                    log.info("讀取條數:" + message.getEntries().size());
                    canalHandler.analysis(message.getEntries());
                } else {
                    try {
                        String hostAddress = InetAddress.getLocalHost().getHostAddress();
                        log.info("監控源實例名稱:{},沒有數據,休息一會啦,canal所在服務器地址:{}", instance, hostAddress);
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 確認消息
                if (id != -1) {
                    conn.ack(id);
                }
            } catch (CanalClientException e) {
                log.error(e.getMessage(), e);
                // 處理失敗, 回滾數據
                conn.rollback();
            } finally {
                conn.disconnect();
            }
        }
    }
}

4、對應的ddl操作代碼,本文使用的  kafka  處理數據

@Component
@Slf4j
public class CanalHandler {

    @Autowired
    IKafkaProducerService producerService;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_INSTALL}")
    String ETL_CANAL_INSTALL;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_DELETE}")
    String ETL_CANAL_DELETE;

    @Value("${CUSTOM.KAFKA.TOPIC.ETL_CANAL_UPDATE}")
    String ETL_CANAL_UPDATE;

//    @Value("${brandInfo}")
//    String brandInfo;

    @Autowired
    IDataManagerService iDataManagerService;

    /**
     * 分析數據
     *
     * @param entries
     * @throws InvalidProtocolBufferException
     */
    public void analysis(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //如果是事務跳過
                continue;
            }
            if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                //封裝
                CanalDB canalDB = new CanalDB();
                canalDB.setTableName(entry.getHeader().getTableName());
                canalDB.setTableSchema(entry.getHeader().getSchemaName());
                log.info("查詢SchemaName:" + canalDB.getTableSchema() + " 表名:" + canalDB.getTableName());
                //查詢是否在模型列表中
                ResponseVO<SysSynchronizationConfigDO> responseVO = iDataManagerService.synchronizationConfig(canalDB);
                if (responseVO.getCode() != 0 || responseVO.getData() == null) {
                    continue;
                }
                canalDB.setSysSynchronizationConfig(responseVO.getData());
                //類型判斷
                if (eventType == CanalEntry.EventType.DELETE) {
                    canalDB.setType("delete");
                    saveDeleteSql(entry, canalDB);
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    canalDB.setType("update");
                    saveUpdateSql(entry, canalDB);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    canalDB.setType("install");
                    saveInsertSql(entry, canalDB);
                }
            }
        }
    }

    /**
     * 更新語句
     *
     * @param entry
     */
    private void saveUpdateSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> dataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : dataList) {
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                TreeMap<String, Object> hashMapHashMap = new TreeMap<>();
                for (CanalEntry.Column column : afterColumnsList) {
                    hashMapHashMap.put(column.getName(), column.getValue());
                    canalDB.setColumnAndData(hashMapHashMap);
                    if (column.getIsKey()) {
                        canalDB.setWhereCase(column.getName() + "=" + column.getValue());
                    }
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send update---->>>" + canal);
                producerService.sendMessage(ETL_CANAL_UPDATE, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 刪除語句
     *
     * @param entry
     */
    private void saveDeleteSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : rowDataList) {
                List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
                for (CanalEntry.Column column : columnList) {
                    if (column.getIsKey()) {
                        canalDB.setWhereCase(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send delete--->>>>" + canal);
                producerService.sendMessage(ETL_CANAL_DELETE, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 插入語句
     *
     * @param entry
     */
    private void saveInsertSql(CanalEntry.Entry entry, CanalDB canalDB) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> dataList = rowChange.getRowDatasList();
            int count = 0;
            for (CanalEntry.RowData rowData : dataList) {
                List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                TreeMap<String, Object> hashMapHashMap = new TreeMap<>();
                for (int i = 0; i < columnList.size(); i++) {
                    hashMapHashMap.put(columnList.get(i).getName(), columnList.get(i).getValue());
                    canalDB.setColumnAndData(hashMapHashMap);
                }
                String canal = JSONObject.toJSONString(canalDB);
                log.info("kafka canal send install----->>" + canal);
                producerService.sendMessage(ETL_CANAL_INSTALL, canal, count);
                count++;
            }
        } catch (InvalidProtocolBufferException e) {
            log.error(e.getMessage(), e);
        }
    }
}

 

 

 

 

完畢!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM