1、deployer基本不需要改動,只需要修改你需要同步的數據庫的庫表,例如:
canal.instance.filter.regex=test.user
2、adapter中的application.yml配置
server: port: 8083 #可以自己修改監聽端口 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp canalServerHost: uat-datacenter2:11115 ##為deployer中的port batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://localhost:3306/test?useUnicode=true #修改成自己的mysql地址 username: root #修改自己的賬戶 password: passwrd #修改自己的密碼 canalAdapters: - instance: instance_test_user #需要和deployer中監聽的文件夾名一致 groups: - groupId: g1 outerAdapters: - name: hbase properties: hbase.zookeeper.quorum: uat-datacenter2,uat-datacenter1,uat-datacenter3 #zk地址,如果只有一個就填寫一個就行,多個,按照我這種填寫。 hbase.zookeeper.property.clientPort: 2181 #端口 zookeeper.znode.parent: /hbase # zk中的路徑 ,可以通過zk的客戶端輸入:ls / 查看是否有這個
3、hbase文件夾中的配置文件instance_test_user.yml
dataSourceKey: defaultDS destination: instance_test_user groupId: ##不要填寫,如果有g1,請刪除,否則啟動沒有報錯,沒有數據 hbaseMapping: mode: STRING database: test table: user hbaseTable: TEST.USER # HBase表名 family: CF # 默認統一Family名稱 uppercaseQualifier: true # 字段名轉大寫, 默認為true #etlCondition: "where c_time>={}" commitBatch: 2 # 批量提交的大小 rowKey: id,name # 復合字段rowKey不能和columns中的rowKey重復 columns: # 數據庫字段:HBase對應字段 id: ROWKEY LEN:15 name: CF:NAME age: AGE create_time: CREATE_TIME
4、在大數據機器輸入Hbase shell
運行 命令:
list #查看有哪些表
然后執行命令:
create 'MYTEST.PERSON', {NAME=>'CF'}
5、啟動deployer和adapter
6、在對應監聽的mysql表中輸入數據,如:
7、查看hbase數據,,如有表示成功了。
scan 'TEST.USER'
結果:
10|100 column=CF:AGE, timestamp=1609903480750, value=101
10|100 column=CF:CREATE_TIME, timestamp=1609903480750, value=2021-01-06 11:23:43
10|100 column=CF:ID, timestamp=1609903480750, value=10
10|100 column=CF:NAME, timestamp=1609903480750, value=100
=========================FAQ====================
1、按照官網配置,沒有報錯沒有數據
解決方法:
那就是不夠仔細,我是因為保留了groupId的值,即 groupId: g1
,實際是不需要保留,直接groupId: 就可以
2、將mysql的數據導入hbase,時間多了一個.0
解決方法:
1、下載源碼,用idea打開
2、找到HbaseSyncService.java中的convertData2Row方法,修改成標紅,即可
private static void convertData2Row(MappingConfig.HbaseMapping hbaseMapping, HRow hRow, Map<String, Object> data) { Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems(); int i = 0; for (Map.Entry<String, Object> entry : data.entrySet()) { if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(entry.getKey())) { continue; } if (entry.getValue() != null) { MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey()); byte[] bytes = typeConvert(columnItem, hbaseMapping, entry.getValue().toString().contains("-")?entry.getValue().toString().replace(".0",""):entry.getValue()); if (columnItem == null) { String familyName = hbaseMapping.getFamily(); String qualifier = entry.getKey(); if (hbaseMapping.isUppercaseQualifier()) { qualifier = qualifier.toUpperCase(); } if (hbaseMapping.getRowKey() == null && i == 0) { hRow.setRowKey(bytes); } else { hRow.addCell(familyName, qualifier, bytes); } } else { if (columnItem.isRowKey()) { if (columnItem.getRowKeyLen() != null && entry.getValue() != null) { if (entry.getValue() instanceof Number) { String v = String.format("%0" + columnItem.getRowKeyLen() + "d", ((Number) entry.getValue()).longValue()); bytes = Bytes.toBytes(v); } else { try { String v = String.format("%0" + columnItem.getRowKeyLen() + "d", Integer.parseInt((String) entry.getValue())); bytes = Bytes.toBytes(v); } catch (Exception e) { logger.error(e.getMessage(), e); } } } hRow.setRowKey(bytes); } else { hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes); } } } i++; } }
3、打包,然后將 adapter中plugin中的jar備份,然后將打好得包client-adapter.hbase-1.1.4-jar-with-dependencies.jar 放入 adapter中的plugin即可。
3、mysql同步Hbase,獲取數據錯位
https://hub.fastgit.org/alibaba/canal/issues/2772https://hub.fastgit.org/alibaba/canal/issues/2772