canal1.1.4默認是不支持 elasticsearch7.x 的.
這里要非常感謝阿里將這個項目開源出來, 讓我們有機會站在人家的肩膀上, 來進行一些改動.
我們可以將 canal1.1.4的 client-adapter 源碼下載下來, 然后進行調試.
1. elasticsearch推薦我們使用 rest 方式, 來進行es的操作, 所以, 在 canal 配置的時候, 我們使用 rest 方式.
2. pom.xml文件修改, 將引用修改為 7.6.2
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.6.2</version> </dependency>
3. 在調試的過程中, 可以發現, 在連接 elasticsearch7.6.2 的時候, com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping會拋異常.
所以需要先對這個方法進行一些修改:
原代碼:
public MappingMetaData getMapping(String index, String type) { MappingMetaData mappingMetaData = null; if (mode == ESClientMode.TRANSPORT) { ...... } else { ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings; try { GetMappingsRequest request = new GetMappingsRequest(); request.indices(index); GetMappingsResponse response; // try { // response = restHighLevelClient // .indices() // .getMapping(request, RequestOptions.DEFAULT); // // 6.4以下版本直接使用該接口會報錯 // } catch (Exception e) { // logger.warn("Low ElasticSearch version for getMapping"); response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); // } mappings = response.mappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } catch (IOException e) { logger.error(e.getMessage(), e); return null; } mappingMetaData = mappings.get(index).get(type); } return mappingMetaData; }
主要就是這個標紅的部分, 會報錯.
修改為:
public MappingMetaData getMapping(String index, String type) { MappingMetaData mappingMetaData = null; if (mode == ESClientMode.TRANSPORT) { ...... } else { ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings; //Map<String, MappingMetaData> mappings; try { GetMappingsRequest request = new GetMappingsRequest(); request.indices(index); // try { // response = restHighLevelClient // .indices() // .getMapping(request, RequestOptions.DEFAULT); // // 6.4以下版本直接使用該接口會報錯 // } catch (Exception e) { // logger.warn("Low ElasticSearch version for getMapping"); //GetMappingsResponse mapping = restHighLevelClient.indices().getMapping(request, RequestOptions.DEFAULT); //response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); // } GetMappingsResponse response = restHighLevelClient.indices() .getMapping(request, RequestOptions.DEFAULT); mappings = response.mappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } catch (IOException e) { logger.error(e.getMessage(), e); return null; } mappingMetaData = mappings.get(index).get(type); } return mappingMetaData; }
只要改這一句話, elasticsearch7.6.2的curd就沒啥問題了. 這個是我親測的. 至於父子文檔這種, 復雜一些的情況, 我還沒有測試過.