elasticsearch7.6.2 -canal1.1.4集成


canal1.1.4默認是不支持 elasticsearch7.x 的. 

這里要非常感謝阿里將這個項目開源出來, 讓我們有機會站在人家的肩膀上, 來進行一些改動.

我們可以將 canal1.1.4的 client-adapter 源碼下載下來, 然后進行調試.

1. elasticsearch推薦我們使用 rest 方式, 來進行es的操作, 所以, 在 canal 配置的時候, 我們使用 rest 方式. 

2. pom.xml文件修改, 將引用修改為 7.6.2

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.6.2</version>
</dependency>

3. 在調試的過程中, 可以發現, 在連接 elasticsearch7.6.2 的時候, com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping會拋異常.

所以需要先對這個方法進行一些修改:

原代碼:

public MappingMetaData getMapping(String index, String type) {
    MappingMetaData mappingMetaData = null;
    if (mode == ESClientMode.TRANSPORT) {
        ......
    } else {
        ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
        try {
            GetMappingsRequest request = new GetMappingsRequest();
            request.indices(index);
            GetMappingsResponse response;
            // try {
            // response = restHighLevelClient
            // .indices()
            // .getMapping(request, RequestOptions.DEFAULT);
            // // 6.4以下版本直接使用該接口會報錯
            // } catch (Exception e) {
            // logger.warn("Low ElasticSearch version for getMapping");
            response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); // }

            mappings = response.mappings();
        } catch (NullPointerException e) {
            throw new IllegalArgumentException("Not found the mapping info of index: " + index);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        mappingMetaData = mappings.get(index).get(type);
    }
    return mappingMetaData;
}

主要就是這個標紅的部分, 會報錯.

修改為:

public MappingMetaData getMapping(String index, String type) {
    MappingMetaData mappingMetaData = null;
    if (mode == ESClientMode.TRANSPORT) {
        ......
    } else {
        ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
        //Map<String, MappingMetaData> mappings;
        try {
            GetMappingsRequest request = new GetMappingsRequest();
            request.indices(index);
            // try {
            // response = restHighLevelClient
            // .indices()
            // .getMapping(request, RequestOptions.DEFAULT);
            // // 6.4以下版本直接使用該接口會報錯
            // } catch (Exception e) {
            // logger.warn("Low ElasticSearch version for getMapping");
            //GetMappingsResponse mapping = restHighLevelClient.indices().getMapping(request, RequestOptions.DEFAULT);
            //response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
            // }
            GetMappingsResponse response = restHighLevelClient.indices() .getMapping(request, RequestOptions.DEFAULT);
            mappings = response.mappings();
        } catch (NullPointerException e) {
            throw new IllegalArgumentException("Not found the mapping info of index: " + index);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        mappingMetaData = mappings.get(index).get(type);
    }
    return mappingMetaData;
}

只要改這一句話, elasticsearch7.6.2的curd就沒啥問題了. 這個是我親測的. 至於父子文檔這種, 復雜一些的情況, 我還沒有測試過. 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM