系統環境:
- canal-1.1.4
- es 5.5.0
- transport方式連接es
各項配置可以直接參考canal官方文檔,由於1.1.4支持的es版本為6.x以上,其他版本需要替換依賴重新編譯client-adapter.elasticsearch模塊,以下為es5.5.0低版本兼容方案以及個人踩的坑。
依賴修改:
修改client-adapter模塊的pom.xml,將es的依賴修改為es版本適配的5.5.0。
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.4.3</version>
</dependency>
由於5.5.0版本無rest-client,因此只修改transport相關版本,后續僅測試tcp連接es同步,rest不確定兼容性。
代碼兼容:
ESConnection.java:
transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
修改為
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
開始編譯
mvn clean install -Dmaven.test.skip -Denv=release
rest兼容問題
問題1
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure: Compilation failure:
[ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未報告的異常錯誤java.io.IOException; 必須對其進行捕獲或聲明以便拋出
[ERROR] canal/client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java:[24,13] 方法引用無效
5.x版本的transportclient不兼容rest-client,注釋掉rest導致的異常。
RestHighLevelClientExt::getMapping
@Deprecated
public static GetMappingsResponse getMapping(RestHighLevelClient restHighLevelClient,
GetMappingsRequest getMappingsRequest,
RequestOptions options) throws IOException,IllegalAccessException {
throw new IllegalAccessException("es 5.x unsupport this method, use tcp mode");
}
ESConnection::getMapping
...
if (mode == ESClientMode.TRANSPORT) {
...
} else {
try {
GetMappingsRequest request = new GetMappingsRequest();
request.indices(index);
GetMappingsResponse response;
// try {
// response = restHighLevelClient
// .indices()
// .getMapping(request, RequestOptions.DEFAULT);
// // 6.4以下版本直接使用該接口會報錯
// } catch (Exception e) {
// logger.warn("Low ElasticSearch version for getMapping");
response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
// }
mappings = response.mappings();
} catch (NullPointerException e) {
throw new IllegalArgumentException("Not found the mapping info of index: " + index);
} catch (IOException | IllegalAccessException e) {//此處增加一個異常捕獲
logger.error(e.getMessage(), e);
return null;
}
...
}
問題2
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure
[ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未報告的異常錯誤java.io.IOException; 必須對其進行捕獲或聲明以便拋出
原因如下,getSourceAsMap方法在6.4.3拋出runtimeException(ElasticsearchParseException是子類),而5.5.0版本拋出IOException,需要顯示捕獲。
//6.4.3拋出的異常時runtimeException
public Map<String, Object> getSourceAsMap() throws ElasticsearchParseException {
return this.sourceAsMap();
}
//5.5.0版本
public Map<String, Object> getSourceAsMap() throws IOException {
return sourceAsMap();
}
修改ESTemplate的getEsType方法捕獲異常即可
ESTemplate::getEsType
Map<String, Object> sourceMap = null;
try{
sourceMap = mappingMetaData.getSourceAsMap();
}catch (IOException e){
logger.error(e.getMessage(), e);
return null;
}
編譯后,替換canal.adapter-1.1.4\plugin下的 client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 文件。
執行deploy和adapter啟動腳本即可。
配置問題
啟動后報錯:
2020-07-07 14:36:08.223 [main] INFO org.elasticsearch.plugins.PluginsService - loaded plugin [org.elasticsearch.transport.Netty4Plugin]
2020-07-07 14:36:08.473 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es failed
java.lang.RuntimeException: java.lang.IllegalArgumentException: unknown setting [mode] please check that any required plugins are installed, or check the breaking changes documentation for removed settings
at com.alibaba.otter.canal.client.adapter.es.ESAdapter.init(ESAdapter.java:137)
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:172)
查看canal源碼,未發現拋出異常日志的代碼,再搜索依賴的包,發現異常是es創建transportClient時拋出的異常,於是猜測是canal-adpapter配置中的某個mode參數被引入創建transportClient的setting中導致創建失敗,於是注釋掉,並重啟。
- name: es
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
properties:
#mode: transport # transport # or rest //注釋了這行,是1.1.4的坑,代碼中properties下的所有配置都會被傳入transportClient的setting中,rest模式則不會,所以transport模式除了cluster.name外的配置會導致es連接創建失敗
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
測試:
重啟后,向mysql插入數據后,adapter打印出日志
[pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":21,"name":"測試用戶","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777991,"type":"INSERT"}
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Destination: example_instance, database:canal, table:class, type:INSERT, affected index count: 1
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Prepared to sync index: canal_test, destination: example_instance
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Single table insert to es index, destination:example_instance, table: class, index: canal_test, id: 21
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 1 ms,destination: example_instance, es index: canal_test
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync completed: canal_test, destination: example_instance
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 2 ms, affected indexes count:1, destination: example_instance
[pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":21,"name":"測試用戶","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777993,"type":"INSERT"}
Affected indexes: canal_test
查看es數據
curl 127.0.0.1:9200/canal_test/canal/21
{
"_index": "canal_test",
"_type": "canal",
"_id": "21",
"_version": 1,
"found": true,
"_source": {
"name": "測試用戶"
}
}
小結:
- canal-adapter不支持索引名,若有頻繁全量構建需求則不適用該方案
- 更新時查詢不支持非數字類型主鍵(拼接SQL字符串導致)
- 表的更新都會同步至es,一對多關聯時,記錄變更可能會觸發索引批量更新,索引若存儲快照數據則建議監聽變更開發帶業務邏輯的adapter