下載
參考:https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0
源碼修改參考:https://www.cnblogs.com/LQBlog/p/14661238.html
zk分布式鎖
get /canal-adapter/sync-switch/{canalDestination} on為可獲取鎖 off為阻塞
配置文件修改
server: port: 8085 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ 數據來源TCP則是binlog 其他則增量數據來源是kafka和rocketMQ #canalServerHost: 192.168.20.5:11111 #canal 單機地址和端口 zookeeperHosts: 192.168.20.4:2181 #基於zookeeper集群 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息 srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/merge_test?useUnicode=true #數據庫 username: kuaihe password: Kuaihe0910Mysql canalAdapters: - instance: kuaihe_db_test # canal instance Name or mq topic name 對應canal創建的文件夾 如果是canal則是訂閱指定canal-server instance 配置的的binlog groups: - groupId: g1 outerAdapters: - name: es7.4.0 #適配器名稱 SPI擴展點 可參考此配置 對應的key com.alibaba.otter.canal.client.adapter.OuterAdapter hosts: 127.0.0.1.1:9200 # es 集群地址, 逗號分隔 properties: mode: rest # transport # or rest security: auth: elastic:T3StdABAk #es用戶名密碼
增加es文件夾
我是基於源碼自己打包 所以 直接在源碼修改打包 啟動會自動掃描es 目錄下的所有yml
pro_brand.yml例子 我的是單表同步 其他復雜同步 可以看官方文檔
dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值 #outerAdapterKey: exampleKey # 對應application.yml中es配置的key destination: kuaihe_db_test # key=value_+_defaultDS對應庫名字+_sql對應表 收到canal的binlog增量消息 則通過當前binlog消息的destination+數據庫+表 找到當前配置com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(int, java.lang.Long, java.util.concurrent.TimeUnit) groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的數據 esMapping: _index: pro_brand _type: _doc _id: id pk: id sql: " select id,create_timestamp,update_timestamp,`code`,brand_code,brand_mall_idx,delete_flag,depot_id,description,idx,kuaihe_idx,logo,`name`,name_en,prefix,sale_channel,seo_description,seo_keywords,seo_title,`status`,vendor_id from pro_brand " commitBatch: 3000
全量
導的時候記得看日志 有沒有報錯
post:http://192.168.20.5:8085/etl/es7.4.0/soa_ord_order_item.yml
{ "succeeded": true, "resultMessage": "導入ES 數據:61513 條" }
增量
增量是啟動自動觸發binlog訂閱 可參考源碼
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader
->啟動worker
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker
根據條件增量
我自己改了源碼更累靈活
post http://127.0.0.1:8087/etl/es7.4.0/soa_ord_order.yml?params=683071930737168384;2021-04-29 00:00:12;2021-04-29 23:55:12
{ "condition":"where user_id={} and create_timestamp>={} and create_timestamp<={}" }
改動點1 com.alibaba.otter.canal.adapter.launcher.rest.CommonRest#etl
@PostMapping("/etl/{type}/{key}/{task}") public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task, @RequestParam(name = "params", required = false) String params,@RequestBody Map<String,String> mapParameters)
try { List<String> paramArray = null; if (params != null) { paramArray = Arrays.asList(params.trim().split(";")); } String condition=mapParameters==null?null:mapParameters.get("condition"); return adapter.etl(task,condition, paramArray); } finally { if (destination != null && oriSwitchStatus) { syncSwitch.on(destination); } else if (destination == null && oriSwitchStatus) { syncSwitch.on(task); } }
改動點2
com.alibaba.otter.canal.client.adapter.es.ESAdapter#etl
@Override public EtlResult etl(String task,String condition, List<String> params) { EtlResult etlResult = new EtlResult(); ESSyncConfig config = esSyncConfig.get(task); if (config != null) { ......... if (dataSource != null) { return esEtlService.importData(params,condition); } ......... } else { ......... EtlResult etlRes = esEtlService.importData(params,condition); .......... } etlResult.setSucceeded(false); etlResult.setErrorMessage("Task not found"); return etlResult; }
改動點3:com.alibaba.otter.canal.client.adapter.support.AbstractEtlService#importData
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey()); List<Object> values = new ArrayList<>(); //liqiangtodo etl支持動態傳入condition改動 condition=condition==null?config.getMapping().getEtlCondition():condition; // 拼接條件 if (condition != null && params != null) { String etlCondition =condition; for (String param : params) { etlCondition = etlCondition.replace("{}", "?"); values.add(param); } sql += " " + etlCondition; } if (logger.isDebugEnabled()) { logger.debug("etl sql : {}", sql); } // 獲取總數 String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
動態更新配置文件
依賴2表canal_config(存儲application配置),canal_adapter_config(存儲es目錄下的配置) 獲取sql腳本
參考:https://www.cnblogs.com/LQBlog/p/14598582.html
配置application.properties
修改的話記得修改修改時間
INSERT INTO `merge_test`.`canal_config`(`id`, `cluster_id`, `server_id`, `name`, `status`, `content`, `content_md5`, `modified_time`) VALUES (2, NULL, NULL, 'application.yml', NULL, 'server:\n port: 8085\nspring:\n jackson:\n date-format: yyyy-MM-dd HH:mm:ss\n time-zone: GMT+8\n default-property-inclusion: non_null\ncanal.conf:\n mode: tcp # kafka rocketMQ\n #canalServerHost: 192.168.20.5:11111 #canal 地址和端口\n zookeeperHosts: 192.168.20.4:2181 #基於zookeeper集群\n batchSize: 500\n syncBatchSize: 1000\n retries: 0\n timeout:\n accessKey:\n secretKey:\n subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息\n srcDataSources:\n defaultDS:\n url: jdbc:mysql://rm-2zeqc826391dt0lk2.mysql.rds.aliyuncs.com:3306/merge_test?useUnicode=true #數據庫\n username: kuaihe\n password: Kuaihe0910Mysql\n canalAdapters:\n - instance: kuaihe_db_test # canal instance Name or mq topic name 對應canal創建的文件夾\n groups:\n - groupId: g1\n outerAdapters:\n - name: es7.4.0 #適配器名稱\n hosts: es-cn-st21taqhf0003bw9m.elasticsearch.aliyuncs.com:9200 # es 集群地址, 逗號分隔\n properties:\n mode: rest # transport # or rest\n security:\n auth: elastic:T3StdABAk #es用戶名密碼\n', '', '2021-04-14 15:09:07');
id一定要為2 詳情可看源碼com.alibaba.otter.canal.adapter.launcher.monitor.remote.DbRemoteConfigLoader#getRemoteAdapterConfig
/** * 獲取遠程application.yml配置 * * @return 配置對象 */ private ConfigItem getRemoteAdapterConfig() { String sql = "select name, content, modified_time from canal_config where id=2"; try (Connection conn = dataSource.getConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { if (rs.next()) { ConfigItem configItem = new ConfigItem(); configItem.setId(2L); configItem.setName(rs.getString("name")); configItem.setContent(rs.getString("content")); configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime()); return configItem; } } catch (Exception e) { logger.error(e.getMessage(), e); } return null; }
配置canal_adapter_config
canal_adapter_config
category為目錄路徑 name為文件名字 修改 之后 記得修改以下修改時間
修改bootstrap.yml 增加讀取配置的數據庫
canal: #基於數據庫 manager: jdbc: url: jdbc:127.0.0.13306/merge_test?useUnicode=true username: root password: 123456
原理
啟動 會讀取這2個表的配置然后刷新到項目文件 項目打了@RefreceScore注解 同時會啟動一個線程監聽改動