說明
不知道是否對7.*所有版本有效 我目前是7.4 如果按照下面方法不行 最好是下載源碼調試
源碼獲取參考 https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0
支持的同步
輸入端
canal adapter個人理解是canal實現的 數據同步增量全量到ES和和各個數據庫 暫時只支持以下幾種可以看出接口打了SPI注解 如果我們輸出到其他端可以擴展
輸出端
如以下配置 我大概看了源碼 tcp增量數據來源是binlog 也可以是kafka和rocketMQ
canal.conf:
mode: tcp # kafka rocketMQ 數據來源TCP則是binlog 其他則增量數據來源是kafka和rocketMQ
同步到ES7.*源碼修改
因為各個ES版本客戶端api都不一致,默認只支持到了6.4以下的版本,如果以上版本 則需要自行替換es客戶端 然后某些不兼容的地方修改,這里我拿ES7.4做比較
/canal-canal-1.1.4/client-adapter/elasticsearch/pom.xml
1.修改POM文件 提高客戶端版本
<!-- <dependency>--> <!-- <groupId>org.elasticsearch</groupId>--> <!-- <artifactId>elasticsearch</artifactId>--> <!-- <version>6.4.3</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.elasticsearch.client</groupId>--> <!-- <artifactId>transport</artifactId>--> <!-- <version>6.4.3</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.elasticsearch.client</groupId>--> <!-- <artifactId>elasticsearch-rest-client</artifactId>--> <!-- <version>6.4.3</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.elasticsearch.client</groupId>--> <!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>--> <!-- <version>6.4.3</version>--> <!-- </dependency>--> <!--liqiangtodo 版本提高改動--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.4.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.4.0</version> </dependency> <!--7.4.0沒有對應版本的transport 所以我還是使用6.4.3--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.3</version> </dependency> <dependency>
2.放開mapping判斷
com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping
public MappingMetaData getMapping(String index, String type) { MappingMetaData mappingMetaData = null; if (mode == ESClientMode.TRANSPORT) { ImmutableOpenMap<String, MappingMetaData> mappings; try { mappings = transportClient.admin() .cluster() .prepareState() .execute() .actionGet() .getState() .getMetaData() .getIndices() .get(index) .getMappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } mappingMetaData = mappings.get(type); } else { ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings; try { GetMappingsRequest request = new GetMappingsRequest(); request.indices(index); GetMappingsResponse response; // try { // response = restHighLevelClient // .indices() // .getMapping(request, RequestOptions.DEFAULT); // // 6.4以下版本直接使用該接口會報錯 // } catch (Exception e) { // logger.warn("Low ElasticSearch version for getMapping"); response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); // } mappings = response.mappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } catch (IOException e) { logger.error(e.getMessage(), e); return null; } mappingMetaData = mappings.get(index).get(type); } return mappingMetaData; }
將紅色部分改為
//liqiangtodo 版本提高改動 public MappingMetaData getMapping(String index, String type) { MappingMetaData mappingMetaData = null; if (mode == ESClientMode.TRANSPORT) { ImmutableOpenMap<String, MappingMetaData> mappings; try { mappings = transportClient.admin() .cluster() .prepareState() .execute() .actionGet() .getState() .getMetaData() .getIndices() .get(index) .getMappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } mappingMetaData = mappings.get(type); } else { ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings; try { GetMappingsRequest request = new GetMappingsRequest(); request.indices(index); GetMappingsResponse response; response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); mappings = response.mappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } catch (IOException e) { logger.error(e.getMessage(), e); return null; } // mappingMetaData = mappings.get(index).get(type); //liqiangtodo 版本提高改動 mappingMetaData = mappings.get(index).get("properties"); } return mappingMetaData; }
3.bulk版本兼容改動
com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk
紅色部分是我的改動,刷新策略是 我需要理解可見 因為用戶下單后 同步到es 需要馬上在訂單列表可見
ublic BulkResponse bulk() { BulkResponse bulkResponse=null; if (mode == ESClientMode.TRANSPORT) { bulkResponse=bulkRequestBuilder.execute().actionGet(); } else { try { //詳見文檔https://blog.csdn.net/hanchao5272/article/details/89151166 刷新策略 立即可見 bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); //liqiangtodo 版本提高修改 bulkResponse =restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT); // return restHighLevelClient.bulk(bulkRequest); } catch (IOException e) { throw new RuntimeException(e); } } return bulkResponse; }
4.getEsType
com.alibaba.otter.canal.client.adapter.es.support.ESTemplate#getEsType
** * 獲取es mapping中的屬性類型 * * @param mapping mapping配置 * @param fieldName 屬性名 * @return 類型 */ @SuppressWarnings("unchecked") private String getEsType(ESMapping mapping, String fieldName) { String key = mapping.get_index() + "-" + mapping.get_type(); Map<String, String> fieldType = esFieldTypes.get(key); if (fieldType != null) { return fieldType.get(fieldName); } else { MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index(), mapping.get_type()); if (mappingMetaData == null) { throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index()); } fieldType = new LinkedHashMap<>(); //liqiangtodo 版本升級改動 Map<String, Object> esMapping =mappingMetaData.getSourceAsMap(); // Map<String, Object> sourceMap ==mappingMetaData.getSourceAsMap(); // Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties"); for (Map.Entry<String, Object> entry : esMapping.entrySet()) { Map<String, Object> value = (Map<String, Object>) entry.getValue(); if (value.containsKey("properties")) { fieldType.put(entry.getKey(), "object"); } else { fieldType.put(entry.getKey(), (String) value.get("type")); } } esFieldTypes.put(key, fieldType); return fieldType.get(fieldName); } }
5.count
com.alibaba.otter.canal.client.adapter.es.ESAdapter#count
@Override public Map<String, Object> count(String task) { ESSyncConfig config = esSyncConfig.get(task); ESMapping mapping = config.getEsMapping(); SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index(), mapping.get_type()).size(0) .getResponse(); //liqiangtodo 提高版本改動 //long rowCount = response.getHits().getTotalHits(); TotalHits totalHits= response.getHits().getTotalHits(); Map<String, Object> res = new LinkedHashMap<>(); res.put("esIndex", mapping.get_index()); res.put("count", totalHits.value); return res; }
6.修改es adapter的SPI的key
client-adapter/elasticsearch/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
#es=com.alibaba.otter.canal.client.adapter.es.ESAdapter
es7.4.0=com.alibaba.otter.canal.client.adapter.es.ESAdapter
消費不到binlog消息修改
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#CanalAdapterWorker
public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address, List<List<OuterAdapter>> canalOuterAdapters){ super(canalOuterAdapters); this.canalClientConfig = canalClientConfig; this.canalDestination = canalDestination; //liqiangtodo 消費不到改動 connector = CanalConnectors.newClusterConnector(Arrays.asList(address), canalDestination, "", ""); //connector = CanalConnectors.newSingleConnector(address, canalDestination, "", ""); }
訂閱指定表配置
防止訂閱到實例里面其他表變動
com.alibaba.otter.canal.client.adapter.support.CanalClientConfig 增加配置
//liqiangtodo canal訂閱的消息 private String subscribe;
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#process
...... //獲取鎖 com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch.init 初始化位置 會讀取zk /** * /canal-adapter/sync-switch/{canalDestination} on為可獲取鎖 off為阻塞 */ syncSwitch.get(canalDestination); logger.info("=============> Start to connect destination: {} <=============", this.canalDestination); connector.connect(); logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination); //liqiangtodo 消費不到改動 訂閱自己關心的 if(StringUtils.isEmpty(canalClientConfig.getSubscribe())){ connector.subscribe("*.*"); }else { connector.subscribe(canalClientConfig.getSubscribe()); } //connector.subscribe(); logger.info("=============> Subscribe destination: {},subscribe:{} succeed <=============",this.canalDestination,StringUtils.isEmpty(canalClientConfig.getSubscribe())?"*.*":canalClientConfig.getSubscribe()); ......
使用方式
canal.conf:
subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息
基本類型空指針
如Interger 數據庫存的是null
com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil#typeConvert
/** * 類型轉換為Mapping中對應的類型 */ public static Object typeConvert(Object val, String esType) { if (val == null) { return null; } if (esType == null) { return val; } //liqiangtodo 增加代碼 //For input string: ""防止空轉換異常指針異常 if(StringUtil.isNullOrEmpty(val.toString())){ return null; } ...... }