Canal監控Mysql同步到Redis(菜鳥也能搭建)


首先要Canal服務端下載:鏈接: https://pan.baidu.com/s/1FwEnqPC1mwNXKRwJuMiLdg 密碼: r8xf

連接數據庫的時候需要給予連接數據庫權限:在my.ini配置文件里加上 log-bin=mysql-bin 這個就行了

連接數據庫的賬號需要授權

CREATE USER cqlpz IDENTIFIED BY 'cqlpz';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cqlpz'@'%';

FLUSH PRIVILEGES; 創建一個賬號並給予權限

解壓出來有4個目錄

2.conf 里面canal.properties 配置canal自己的 比如多個服務連接,canal緩存名稱

這是example里面的內容;

兩個properties就是配置自己的數據庫連接,meta.dat相當於啟動canal服務過后的額緩存信息

 

3.lib 就是canal需要的jar

4.logs 日志文件

maven引入jar

 1 <dependency>
 2      <groupId>com.alibaba.otter</groupId>
 3      <artifactId>canal.client</artifactId>
 4      <version>1.0.25</version>
 5 </dependency>
 6 <dependency>
 7      <groupId>redis.clients</groupId>
 8      <artifactId>jedis</artifactId>
 9      <version>2.9.0</version>
10 </dependency>

java代碼

1.連接canal服務

 // 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(), 端口), "canal名稱(虛擬數據名)", "數據庫賬號", "數據庫密碼");

2.連接某個數據庫

connector.connect();
connector.subscribe("監控的數據庫\\..*"); connector.rollback();

3.獲取操作的事件

Message message = connector.getWithoutAck(數量); // 獲取指定數量的數據
 List<Entry> entrys=message.getEntries();
 for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { System.out.println("獲取數據失敗:"+e.getMessage()); } EventType eventType = rowChage.getEventType(); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType.equals(EventType.DELETE)) { rowData.getBeforeColumnsList();//刪除的所有數據 } else if (eventType.equals(EventType.INSERT)) { rowData.getAfterColumnsList();//添加的所有數據 }else if(eventType.equals(EventType.UPDATE)) { rowData.getAfterColumnsList();//修改的所有數據 } }

完整的java類

public class ClientSample {
    private static Logger logger = LoggerFactory.getLogger(ClientSample.class);

    public  void startCanalThread(){
        Thread thread = new StartCanalThread();
        thread.start();
    }
    /**
     * canal 線程
     */
    public class StartCanalThread extends Thread {
        @Override
        public void run() {
            // 創建鏈接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
                "longpizi",
                "longpizi",
                "cqlongpizi");
            connector.connect();
            connector.subscribe("test\\..*");
            connector.rollback();
            try {
                while (true) {
                    // 獲取指定數量的數據
                    Message message = connector.getWithoutAck(1000);
                    long batchId = message.getId();
                    if (batchId != -1 &&  message.getEntries().size() > 0) {
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交確認
                    Thread.sleep(2000);
                }

            }catch (Exception e){
                logger.error("Canal線程異常,已終止:"+e.getMessage());
            } finally {
                //中斷Canal連接
                connector.disconnect();
            }
        }
    }

    /**
     * 數據庫執行的操作
     * @param entrys
     */
    private static void printEntry( List<Entry> entrys) {
        for (Entry entry : entrys) {
            //操作事物 忽略
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
            RowChange rowChage = null;//執行事件信息
            String database=null;//執行的數據庫
            String table=null;//執行的表
            try {
                database=entry.getHeader().getSchemaName();
                table=entry.getHeader().getTableName();
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
               logger.error("獲取數據失敗:"+e.getMessage());
            }
            //獲取執行的事件
            EventType eventType = rowChage.getEventType();
            for (RowData rowData : rowChage.getRowDatasList()) {
                //刪除操作
                if (eventType.equals(EventType.DELETE)) {
                    redisDelete(rowData.getBeforeColumnsList(),database,table);
                }
                //添加操作
                else if (eventType.equals(EventType.INSERT)) {
                    redisInsert(rowData.getAfterColumnsList(),database,table);
                }
                //修改操作
                else if(eventType.equals(EventType.UPDATE)) {
                    redisUpdate(rowData.getAfterColumnsList(),database,table);
                }
                //修改表結構
                else if(eventType.equals(EventType.ALTER)){
                    logger.info("修改表結構");
                }
            }
        }
    }

    /**
     * 數據庫執行了添加操作
     * @param columns
     * @param database
     * @param table
     */
    private static void redisInsert( List<Column> columns,String database,String table){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        System.out.println("數據庫:"+database+"==>表:"+table+"==>添加數據:"+JSON.toJSONString(json));
    }

    /**
     * 數據庫執行了修改操作
     * @param columns
     * @param database
     * @param table
     */
    private static  void redisUpdate( List<Column> columns,String database,String table){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        System.out.println("數據庫:"+database+"==>表:"+table+"==>修改數據:"+JSON.toJSONString(json));
    }

    /**
     * 數據庫執行了刪除操作
     * @param columns
     * @param database
     * @param table
     */
    private static  void redisDelete( List<Column> columns,String database,String table){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        System.out.println("數據庫:"+database+"==>表:"+table+"==>刪除數據:"+JSON.toJSONString(json));
    }
}

 

 

這樣就可把操作的數據更新到redis里面了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM