canal-adapter-進行全量和增量到ES7.*(七)


下載

參考:https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0

源碼修改參考:https://www.cnblogs.com/LQBlog/p/14661238.html

zk分布式鎖

get /canal-adapter/sync-switch/{canalDestination} on為可獲取鎖 off為阻塞

 

配置文件修改

server:
  port: 8085
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp # kafka rocketMQ 數據來源TCP則是binlog 其他則增量數據來源是kafka和rocketMQ
  #canalServerHost: 192.168.20.5:11111              #canal 單機地址和端口
  zookeeperHosts: 192.168.20.4:2181 #基於zookeeper集群
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/merge_test?useUnicode=true      #數據庫
      username: kuaihe
      password: Kuaihe0910Mysql
  canalAdapters:
    - instance: kuaihe_db_test            # canal instance Name or mq topic name  對應canal創建的文件夾 如果是canal則是訂閱指定canal-server instance 配置的的binlog
      groups:
        - groupId: g1
          outerAdapters:
            - name: es7.4.0    #適配器名稱 SPI擴展點 可參考此配置 對應的key com.alibaba.otter.canal.client.adapter.OuterAdapter
              hosts: 127.0.0.1.1:9200                     # es 集群地址, 逗號分隔
              properties:
                mode: rest # transport # or rest
                security:
                  auth: elastic:T3StdABAk #es用戶名密碼

增加es文件夾

我是基於源碼自己打包 所以 直接在源碼修改打包 啟動會自動掃描es 目錄下的所有yml

 

 pro_brand.yml例子 我的是單表同步 其他復雜同步 可以看官方文檔

dataSourceKey: defaultDS        # 源數據源的key, 對應上面配置的srcDataSources中的值
#outerAdapterKey: exampleKey     # 對應application.yml中es配置的key
destination: kuaihe_db_test            # key=value_+_defaultDS對應庫名字+_sql對應表  收到canal的binlog增量消息 則通過當前binlog消息的destination+數據庫+表 找到當前配置com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(int, java.lang.Long, java.util.concurrent.TimeUnit)
groupId: g1                       # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
  _index: pro_brand
  _type: _doc
  _id: id
  pk: id
  sql: "
select id,create_timestamp,update_timestamp,`code`,brand_code,brand_mall_idx,delete_flag,depot_id,description,idx,kuaihe_idx,logo,`name`,name_en,prefix,sale_channel,seo_description,seo_keywords,seo_title,`status`,vendor_id from pro_brand
  "
  commitBatch: 3000

 

全量

導的時候記得看日志 有沒有報錯

post:http://192.168.20.5:8085/etl/es7.4.0/soa_ord_order_item.yml

{
    "succeeded": true,
    "resultMessage": "導入ES 數據:61513 條"
}

增量

增量是啟動自動觸發binlog訂閱 可參考源碼

com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader

->啟動worker

com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker

根據條件增量

我自己改了源碼更累靈活

post http://127.0.0.1:8087/etl/es7.4.0/soa_ord_order.yml?params=683071930737168384;2021-04-29 00:00:12;2021-04-29 23:55:12

{
    "condition":"where user_id={} and create_timestamp>={} and create_timestamp<={}"
}

改動點1 com.alibaba.otter.canal.adapter.launcher.rest.CommonRest#etl

   @PostMapping("/etl/{type}/{key}/{task}")
    public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                         @RequestParam(name = "params", required = false) String params,@RequestBody  Map<String,String>  mapParameters)
  try {
                List<String> paramArray = null;
                if (params != null) {
                    paramArray = Arrays.asList(params.trim().split(";"));
                }
                String condition=mapParameters==null?null:mapParameters.get("condition");
                return adapter.etl(task,condition, paramArray);
            } finally {
                if (destination != null && oriSwitchStatus) {
                    syncSwitch.on(destination);
                } else if (destination == null && oriSwitchStatus) {
                    syncSwitch.on(task);
                }
            }

改動點2

com.alibaba.otter.canal.client.adapter.es.ESAdapter#etl

 @Override
    public EtlResult etl(String task,String condition, List<String> params) {
        EtlResult etlResult = new EtlResult();
        ESSyncConfig config = esSyncConfig.get(task);
        if (config != null) {
           .........
            if (dataSource != null) {
                return esEtlService.importData(params,condition);
            } 
          .........
        } else {
                 .........
                    EtlResult etlRes = esEtlService.importData(params,condition);
                 .......... 
                
            }
            
        etlResult.setSucceeded(false);
        etlResult.setErrorMessage("Task not found");
        return etlResult;
    }

改動點3:com.alibaba.otter.canal.client.adapter.support.AbstractEtlService#importData

    DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

            List<Object> values = new ArrayList<>();
            //liqiangtodo etl支持動態傳入condition改動
            condition=condition==null?config.getMapping().getEtlCondition():condition;
            // 拼接條件
            if (condition != null && params != null) {
                String etlCondition =condition;
                for (String param : params) {
                    etlCondition = etlCondition.replace("{}", "?");
                    values.add(param);
                }
                sql += " " + etlCondition;
            }

            if (logger.isDebugEnabled()) {
                logger.debug("etl sql : {}", sql);
            }

            // 獲取總數
            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";

 

 

動態更新配置文件

依賴2表canal_config(存儲application配置),canal_adapter_config(存儲es目錄下的配置) 獲取sql腳本

參考:https://www.cnblogs.com/LQBlog/p/14598582.html 

配置application.properties

修改的話記得修改修改時間

INSERT INTO `merge_test`.`canal_config`(`id`, `cluster_id`, `server_id`, `name`, `status`, `content`, `content_md5`, `modified_time`) VALUES (2, NULL, NULL, 'application.yml', NULL, 'server:\n  port: 8085\nspring:\n  jackson:\n    date-format: yyyy-MM-dd HH:mm:ss\n    time-zone: GMT+8\n    default-property-inclusion: non_null\ncanal.conf:\n  mode: tcp # kafka rocketMQ\n  #canalServerHost: 192.168.20.5:11111              #canal 地址和端口\n  zookeeperHosts: 192.168.20.4:2181 #基於zookeeper集群\n  batchSize: 500\n  syncBatchSize: 1000\n  retries: 0\n  timeout:\n  accessKey:\n  secretKey:\n  subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息\n  srcDataSources:\n    defaultDS:\n      url: jdbc:mysql://rm-2zeqc826391dt0lk2.mysql.rds.aliyuncs.com:3306/merge_test?useUnicode=true      #數據庫\n      username: kuaihe\n      password: Kuaihe0910Mysql\n  canalAdapters:\n    - instance: kuaihe_db_test            # canal instance Name or mq topic name  對應canal創建的文件夾\n      groups:\n        - groupId: g1\n          outerAdapters:\n            - name: es7.4.0    #適配器名稱\n              hosts: es-cn-st21taqhf0003bw9m.elasticsearch.aliyuncs.com:9200                     # es 集群地址, 逗號分隔\n              properties:\n                mode: rest # transport # or rest\n                security:\n                  auth: elastic:T3StdABAk #es用戶名密碼\n', '', '2021-04-14 15:09:07');

 

 

 id一定要為2 詳情可看源碼com.alibaba.otter.canal.adapter.launcher.monitor.remote.DbRemoteConfigLoader#getRemoteAdapterConfig

    /**
     * 獲取遠程application.yml配置
     *
     * @return 配置對象
     */
    private ConfigItem getRemoteAdapterConfig() {
        String sql = "select name, content, modified_time from canal_config where id=2";
        try (Connection conn = dataSource.getConnection();
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery(sql)) {
            if (rs.next()) {
                ConfigItem configItem = new ConfigItem();
                configItem.setId(2L);
                configItem.setName(rs.getString("name"));
                configItem.setContent(rs.getString("content"));
                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
                return configItem;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return null;
    }

配置canal_adapter_config

 canal_adapter_config

category為目錄路徑 name為文件名字 修改 之后 記得修改以下修改時間

 

修改bootstrap.yml 增加讀取配置的數據庫

canal: #基於數據庫
  manager:
    jdbc:
      url: jdbc:127.0.0.13306/merge_test?useUnicode=true
      username: root
      password: 123456

 

 

原理

啟動 會讀取這2個表的配置然后刷新到項目文件 項目打了@RefreceScore注解  同時會啟動一個線程監聽改動


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM