canal-adapter-進行全量和增量到ES7.*源碼修改(六)


說明

不知道是否對7.*所有版本有效 我目前是7.4 如果按照下面方法不行 最好是下載源碼調試

源碼獲取參考 https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0

支持的同步

輸入端

canal adapter個人理解是canal實現的 數據同步增量全量到ES和和各個數據庫 暫時只支持以下幾種可以看出接口打了SPI注解 如果我們輸出到其他端可以擴展

 

 

輸出端

如以下配置 我大概看了源碼 tcp增量數據來源是binlog 也可以是kafka和rocketMQ

canal.conf:
  mode: tcp # kafka rocketMQ 數據來源TCP則是binlog 其他則增量數據來源是kafka和rocketMQ

 

 

同步到ES7.*源碼修改

因為各個ES版本客戶端api都不一致,默認只支持到了6.4以下的版本,如果以上版本 則需要自行替換es客戶端 然后某些不兼容的地方修改,這里我拿ES7.4做比較

/canal-canal-1.1.4/client-adapter/elasticsearch/pom.xml

1.修改POM文件 提高客戶端版本

<!--        <dependency>-->
<!--            <groupId>org.elasticsearch</groupId>-->
<!--            <artifactId>elasticsearch</artifactId>-->
<!--            <version>6.4.3</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.elasticsearch.client</groupId>-->
<!--            <artifactId>transport</artifactId>-->
<!--            <version>6.4.3</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.elasticsearch.client</groupId>-->
<!--            <artifactId>elasticsearch-rest-client</artifactId>-->
<!--            <version>6.4.3</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.elasticsearch.client</groupId>-->
<!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!--            <version>6.4.3</version>-->
<!--        </dependency>-->
        <!--liqiangtodo 版本提高改動-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.4.0</version>
        </dependency>
        <!--7.4.0沒有對應版本的transport 所以我還是使用6.4.3-->
                <dependency>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>transport</artifactId>
                    <version>6.4.3</version>
                </dependency>
        <dependency>

2.放開mapping判斷

com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping

  public MappingMetaData getMapping(String index, String type) {
        MappingMetaData mappingMetaData = null;
        if (mode == ESClientMode.TRANSPORT) {
            ImmutableOpenMap<String, MappingMetaData> mappings;
            try {
                mappings = transportClient.admin()
                    .cluster()
                    .prepareState()
                    .execute()
                    .actionGet()
                    .getState()
                    .getMetaData()
                    .getIndices()
                    .get(index)
                    .getMappings();
            } catch (NullPointerException e) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
            }
            mappingMetaData = mappings.get(type);

        } else {
            ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
            try {
                GetMappingsRequest request = new GetMappingsRequest();
                request.indices(index);
                GetMappingsResponse response;
                // try {
                // response = restHighLevelClient
                // .indices()
                // .getMapping(request, RequestOptions.DEFAULT);
                // // 6.4以下版本直接使用該接口會報錯
                // } catch (Exception e) {
                // logger.warn("Low ElasticSearch version for getMapping");
                response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                // }

                mappings = response.mappings();
            } catch (NullPointerException e) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
                return null;
            }
            mappingMetaData = mappings.get(index).get(type);
        
        }
        return mappingMetaData;
    }

將紅色部分改為

    //liqiangtodo 版本提高改動
    public MappingMetaData getMapping(String index, String type) {
        MappingMetaData mappingMetaData = null;
        if (mode == ESClientMode.TRANSPORT) {
            ImmutableOpenMap<String, MappingMetaData> mappings;
            try {
                mappings = transportClient.admin()
                        .cluster()
                        .prepareState()
                        .execute()
                        .actionGet()
                        .getState()
                        .getMetaData()
                        .getIndices()
                        .get(index)
                        .getMappings();
            } catch (NullPointerException e) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
            }
            mappingMetaData = mappings.get(type);

        } else {
            ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
            try {
                GetMappingsRequest request = new GetMappingsRequest();
                request.indices(index);
                GetMappingsResponse response;
                response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                mappings = response.mappings();
            } catch (NullPointerException e) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
                return null;
            }
          //  mappingMetaData = mappings.get(index).get(type);
            //liqiangtodo 版本提高改動
            mappingMetaData = mappings.get(index).get("properties");
        }
        return mappingMetaData;
    }

3.bulk版本兼容改動

com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk

紅色部分是我的改動,刷新策略是 我需要理解可見 因為用戶下單后 同步到es 需要馬上在訂單列表可見

ublic BulkResponse bulk() {
            BulkResponse bulkResponse=null;
            if (mode == ESClientMode.TRANSPORT) {
                bulkResponse=bulkRequestBuilder.execute().actionGet();
            } else {
                try {
                    //詳見文檔https://blog.csdn.net/hanchao5272/article/details/89151166 刷新策略 立即可見
                    bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    //liqiangtodo 版本提高修改
                    bulkResponse =restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
                   // return restHighLevelClient.bulk(bulkRequest);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return bulkResponse;

        }

4.getEsType

com.alibaba.otter.canal.client.adapter.es.support.ESTemplate#getEsType

**
     * 獲取es mapping中的屬性類型
     *
     * @param mapping   mapping配置
     * @param fieldName 屬性名
     * @return 類型
     */
    @SuppressWarnings("unchecked")
    private String getEsType(ESMapping mapping, String fieldName) {
        String key = mapping.get_index() + "-" + mapping.get_type();
        Map<String, String> fieldType = esFieldTypes.get(key);
        if (fieldType != null) {
            return fieldType.get(fieldName);
        } else {
            MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index(), mapping.get_type());

            if (mappingMetaData == null) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
            }

            fieldType = new LinkedHashMap<>();

            //liqiangtodo 版本升級改動
            Map<String, Object> esMapping =mappingMetaData.getSourceAsMap();
//            Map<String, Object> sourceMap ==mappingMetaData.getSourceAsMap();
//            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
            for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
                Map<String, Object> value = (Map<String, Object>) entry.getValue();
                if (value.containsKey("properties")) {
                    fieldType.put(entry.getKey(), "object");
                } else {
                    fieldType.put(entry.getKey(), (String) value.get("type"));
                }
            }
            esFieldTypes.put(key, fieldType);

            return fieldType.get(fieldName);
        }
    }

5.count

com.alibaba.otter.canal.client.adapter.es.ESAdapter#count

 @Override
    public Map<String, Object> count(String task) {
        ESSyncConfig config = esSyncConfig.get(task);
        ESMapping mapping = config.getEsMapping();
        SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index(), mapping.get_type()).size(0)
            .getResponse();

        //liqiangtodo 提高版本改動
        //long rowCount = response.getHits().getTotalHits();
        TotalHits totalHits= response.getHits().getTotalHits();
        Map<String, Object> res = new LinkedHashMap<>();
        res.put("esIndex", mapping.get_index());
      res.put("count", totalHits.value);
        return res;
    }

6.修改es adapter的SPI的key

client-adapter/elasticsearch/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

#es=com.alibaba.otter.canal.client.adapter.es.ESAdapter
es7.4.0=com.alibaba.otter.canal.client.adapter.es.ESAdapter

 

消費不到binlog消息修改

com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#CanalAdapterWorker

   public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,
                              List<List<OuterAdapter>> canalOuterAdapters){
        super(canalOuterAdapters);
        this.canalClientConfig = canalClientConfig;
        this.canalDestination = canalDestination;
        //liqiangtodo 消費不到改動
        connector = CanalConnectors.newClusterConnector(Arrays.asList(address), canalDestination, "", "");
        //connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
    }

訂閱指定表配置

防止訂閱到實例里面其他表變動

com.alibaba.otter.canal.client.adapter.support.CanalClientConfig 增加配置

   //liqiangtodo canal訂閱的消息
    private String subscribe;

com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#process

...... 
 //獲取鎖 com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch.init 初始化位置 會讀取zk
                /**
                 * /canal-adapter/sync-switch/{canalDestination} on為可獲取鎖 off為阻塞
                 */
                syncSwitch.get(canalDestination);

                logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                connector.connect();
                logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                //liqiangtodo 消費不到改動 訂閱自己關心的
                if(StringUtils.isEmpty(canalClientConfig.getSubscribe())){
                    connector.subscribe("*.*");
                }else {
                    connector.subscribe(canalClientConfig.getSubscribe());
                }
                //connector.subscribe();
                logger.info("=============> Subscribe destination: {},subscribe:{} succeed <=============",this.canalDestination,StringUtils.isEmpty(canalClientConfig.getSubscribe())?"*.*":canalClientConfig.getSubscribe());
......

使用方式

canal.conf:
  subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源碼加的防止訂閱不關心的消息

基本類型空指針

如Interger 數據庫存的是null 

com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil#typeConvert

 /**
     * 類型轉換為Mapping中對應的類型
     */
    public static Object typeConvert(Object val, String esType) {
        if (val == null) {
            return null;
        }
        if (esType == null) {
            return val;
        }
        //liqiangtodo 增加代碼
        //For input string: ""防止空轉換異常指針異常
        if(StringUtil.isNullOrEmpty(val.toString())){
            return null;
        }
 ......
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM