canal數據格式,client開發


 1.canal數據格式:

Entry
    Header
        logfileName [binlog文件名]
        logfileOffset [binlog position]
        executeTime [發生的變更]
        schemaName
        tableName
        eventType [insert/update/delete類型]
    entryType   [事務頭BEGIN/事務尾END/數據ROWDATA]
    storeValue  [byte數據,可展開,對應的類型為RowChange]  
RowChange
    isDdl       [是否是ddl變更操作,比如create table/drop table]
    sql     [具體的ddl sql]
    rowDatas    [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
        beforeColumns [Column類型的數組]
        afterColumns [Column類型的數組]    
Column
    index     
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否為主鍵]
    updated     [是否發生過變更]
    isNull      [值是否為null]
    value       [具體的內容,注意為文本]

  2.java程序測試

package com.cfang.prebo;
 
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
 
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
 
public class CanalTest {
 
    public static void main(String[] args) throws Exception {
         
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", "");
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
         
        while (true) {
            Message message = connector.getWithoutAck(100);  // 獲取指定數量的數據
            long batchId = message.getId();
            if (batchId == -1 || message.getEntries().isEmpty()) {
                Thread.sleep(1000);
                continue;
            }
           // System.out.println(message.getEntries());
            printEntries(message.getEntries());
            connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據
//            connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據
        }
    }
     
    private static void printEntries(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }
 
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
             
            EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
             
            for (RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
                    case INSERT:
                         System.out.println("INSERT ");
                         printColumns(rowData.getAfterColumnsList());
                         break;
                    case UPDATE:
                        System.out.println("UPDATE ");
                        printColumns(rowData.getAfterColumnsList());
                        break;
                    case DELETE:
                        System.out.println("DELETE ");
                        printColumns(rowData.getBeforeColumnsList());
                        break;
     
                    default:
                        break;
                 }
            }
        }
    }
 
    private static void printColumns(List<Column> columns) {
        for(Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

  

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM