Canal-從零開始:Canal連接kafka同步ElasticSearch


  省略kafka和zookeeper的安裝,直接開始server的配置

修改conf/example下的instance.properties文件

  

# mysql的master地址,賬號及密碼
canal.instance.master.address=127.0.0.1:13306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
#解析白名單,正則;
canal.instance.filter.regex=mytest\\..*
# mq config
canal.mq.topic=example

  我這一段的配置表示,example這個示例,監聽的是mytest這個database下面所有的表,會把監聽到的結果發送到kafka的example隊列上

 

修改conf/canal.properties文件

  

# tcp, kafka, RocketMQ,serverMode改成kafka
canal.serverMode = kafka

  

##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = 127.0.0.1:9092 #這里改成連接的kafka地址

  

修改完之后重啟canal.server;

如果kafka在canal啟動后有更改,canal需要重啟

kafka消費端代碼

 

package com.canal.demo.handler;

import com.alibaba.fastjson.JSON;
import com.canal.demo.constants.BinlogConstants;
import com.canal.demo.es.EsRestClient;
import com.canal.demo.transfer.DataConvertFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * <p>
 * 消息處理器
 * </p>
 *
 * @package: com.xkcoding.mq.kafka.handler
 * @description: 消息處理器
 * @author: yangkai.shen
 * @date: Created in 2019-01-07 14:58
 * @copyright: Copyright (c) 2019
 * @version: V1.0
 * @modified: yangkai.shen
 */
@Component
@Slf4j
public class MessageHandler {
    @Autowired
    DataConvertFactory dataConvertFactory;
    @Autowired
    EsRestClient esRestClient;
    @Value("${hostAndPort}")
    private String hostAndPort;

    @KafkaListener(topics = {"example","example_2"}, containerFactory = "ackContainerFactory")
    public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
        try {
            String message = (String) record.value();
            log.info("收到消息Str: {}", message);
            Map<String, Object> map = JSON.parseObject(message);
            List<Map<String, String>> dataList = (List<Map<String, String>>) map.get(BinlogConstants.BINLOG_DATA_KEY);
            if (dataList != null) {
                log.info("接受行數據:{}", JSON.toJSONString(dataList));
                String table = (String) map.get(BinlogConstants.BINLOG_TBL_KEY);
                // 進行格式轉換的數據
                Map<String, String> params = dataConvertFactory.transferData(dataList.get(0), table);
                String type = (String) map.get(BinlogConstants.BINLOG_TYPE_KEY);
                esRestClient.buildClient(hostAndPort);
                switch (type) {
                    case BinlogConstants.INSERT_EVENT:
                        String doc = esRestClient.insertDocument(table, "_doc", params);
                        log.info("doc_id:{}", doc);
                        break;
                    case BinlogConstants.UPDATE_EVENT:
                        log.info("update---");
                        break;
                    case BinlogConstants.DELETE_EVENT:
                        log.info("delete");
                        break;
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            // 手動提交 offset
            acknowledgment.acknowledge();
        }
    }
}

 

在數據庫插入數據

insert into mytest.test_tab_3 (c_id,c_name) values(225,'abda');

  

控制台輸出結果

2020-11-13 17:22:25.606  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 收到消息Str: {"data":[{"c_id":"225","c_name":"abda","c_age":null}],"database":"mytest","es":1605259339000,"id":3,"isDdl":false,"mysqlType":{"c_id":"int","c_name":"varchar(20)","c_age":"int"},"old":null,"pkNames":["c_id"],"sql":"","sqlType":{"c_id":4,"c_name":12,"c_age":4},"table":"test_tab_3","ts":1605259339448,"type":"INSERT"}
2020-11-13 17:22:25.672  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 接受行數據:[{"c_id":"225","c_name":"abda"}]

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM