Canal同步Mysql數據至Hbase


1、deployer基本不需要改動,只需要修改你需要同步的數據庫的庫表,例如:

canal.instance.filter.regex=test.user

2、adapter中的application.yml配置

server:
  port: 8083  #可以自己修改監聽端口
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp
  canalServerHost: uat-datacenter2:11115  ##為deployer中的port
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://localhost:3306/test?useUnicode=true  #修改成自己的mysql地址
      username: root  #修改自己的賬戶
      password: passwrd  #修改自己的密碼
  canalAdapters:
  - instance: instance_test_user  #需要和deployer中監聽的文件夾名一致
    groups:
    - groupId: g1
      outerAdapters:
      - name: hbase                                                                            
        properties:
          hbase.zookeeper.quorum: uat-datacenter2,uat-datacenter1,uat-datacenter3  #zk地址,如果只有一個就填寫一個就行,多個,按照我這種填寫。
          hbase.zookeeper.property.clientPort: 2181  #端口
          zookeeper.znode.parent: /hbase  # zk中的路徑 ,可以通過zk的客戶端輸入:ls / 查看是否有這個

3、hbase文件夾中的配置文件instance_test_user.yml

dataSourceKey: defaultDS
destination: instance_test_user
groupId:    ##不要填寫,如果有g1,請刪除,否則啟動沒有報錯,沒有數據
hbaseMapping:
  mode: STRING
  database: test
  table: user
  hbaseTable: TEST.USER   # HBase表名
  family: CF  # 默認統一Family名稱
  uppercaseQualifier: true  # 字段名轉大寫, 默認為true
  #etlCondition: "where c_time>={}"
  commitBatch: 2 # 批量提交的大小
  rowKey: id,name  # 復合字段rowKey不能和columns中的rowKey重復
  columns:
    # 數據庫字段:HBase對應字段
    id: ROWKEY LEN:15
    name: CF:NAME
    age: AGE
    create_time: CREATE_TIME

4、在大數據機器輸入Hbase shell 

運行 命令:
        list #查看有哪些表
        
然后執行命令:
        create 'MYTEST.PERSON', {NAME=>'CF'}

5、啟動deployer和adapter

6、在對應監聽的mysql表中輸入數據,如:

 

 7、查看hbase數據,,如有表示成功了。

scan 'TEST.USER'

結果:
 10|100                                          column=CF:AGE, timestamp=1609903480750, value=101                                                                                           
 10|100                                          column=CF:CREATE_TIME, timestamp=1609903480750, value=2021-01-06 11:23:43                                                                   
 10|100                                          column=CF:ID, timestamp=1609903480750, value=10                                                                                             
 10|100                                          column=CF:NAME, timestamp=1609903480750, value=100    

 

=========================FAQ====================

1、按照官網配置,沒有報錯沒有數據

解決方法:
    那就是不夠仔細,我是因為保留了groupId的值,即 groupId:  g1
    
    ,實際是不需要保留,直接groupId: 就可以

2、將mysql的數據導入hbase,時間多了一個.0

解決方法:

  1、下載源碼,用idea打開

  2、找到HbaseSyncService.java中的convertData2Row方法,修改成標紅,即可

    private static void convertData2Row(MappingConfig.HbaseMapping hbaseMapping, HRow hRow, Map<String, Object> data) {
        Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
        int i = 0;
        for (Map.Entry<String, Object> entry : data.entrySet()) {
            if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(entry.getKey())) {
                continue;
            }
            if (entry.getValue() != null) {
                MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey());

                byte[] bytes = typeConvert(columnItem, hbaseMapping, entry.getValue().toString().contains("-")?entry.getValue().toString().replace(".0",""):entry.getValue()); if (columnItem == null) {
                    String familyName = hbaseMapping.getFamily();
                    String qualifier = entry.getKey();
                    if (hbaseMapping.isUppercaseQualifier()) {
                        qualifier = qualifier.toUpperCase();
                    }

                    if (hbaseMapping.getRowKey() == null && i == 0) {
                        hRow.setRowKey(bytes);
                    } else {
                        hRow.addCell(familyName, qualifier, bytes);
                    }
                } else {
                    if (columnItem.isRowKey()) {
                        if (columnItem.getRowKeyLen() != null && entry.getValue() != null) {
                            if (entry.getValue() instanceof Number) {
                                String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
                                    ((Number) entry.getValue()).longValue());
                                bytes = Bytes.toBytes(v);
                            } else {
                                try {
                                    String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
                                        Integer.parseInt((String) entry.getValue()));
                                    bytes = Bytes.toBytes(v);
                                } catch (Exception e) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                        hRow.setRowKey(bytes);
                    } else {
                        hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes);
                    }
                }
            }
            i++;
        }
    }

   3、打包,然后將 adapter中plugin中的jar備份,然后將打好得包client-adapter.hbase-1.1.4-jar-with-dependencies.jar 放入 adapter中的plugin即可。

3、mysql同步Hbase,獲取數據錯位

https://hub.fastgit.org/alibaba/canal/issues/2772https://hub.fastgit.org/alibaba/canal/issues/2772

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM