Canal-入門例子-下載安裝(一)


開發手冊

https://github.com/alibaba/canal

mysql配置

1.開啟binlog

找到mysql安裝目錄 mysql --help|grep my.cnf 編輯my.cf

[mysqld]
# log_bin
log-bin = mysql-bin #開啟binlog
binlog-format = ROW #選擇row模式
server_id = 1 #配置mysql replication需要定義,不能喝canal的slaveId重復

 

2.查看是否開啟相關命令

--是否開啟了binlog on為開啟
show variables like 'log_bin'
--查看當前日志文件信息
show master status;
--關閉當前日志文件 創建一個新的日志文件 原有日志文件編號+1
flush logs;
--查看所有日志文件events
show binlog events limit 0,1
--查看指定文件events
show binlog events in 'mysql-bin.003243';
--查看文件列表
show binary logs;
--查看binlog格式 ROW 
show variables like 'binlog_format'

 

canal目錄介紹

/bin 為啟動腳本

/conf/canal_local.propreteis 為canal admin集群時的配置通過./start.sh 使用

/conf/example 為canal監控的數據庫實例的配置 客戶端通過指定Destination 可以消費具體一個 比如order服務就消費order數據庫實例  product則消費product數據庫實例 但是canal-server是一個 當然對應的目錄則不是example而是需要創建order和produc目錄 同時在canal.propreteis 指定canal.destinations = order,product

                    /h2.mv.db 為使用h2數據庫時使用 /meta.dat 存儲了當前讀取的binlog文件 和指針位置 

                    /instace.properteis 則配置的監控的數據庫實例的binlog日志

 

 

 

 

 

安裝CanalServer

下載

https://github.com/alibaba/canal/releases/tag/canal-1.1.4

或者根據下載源碼執行maven安裝

這種方式推薦,因為有時需要修改源碼完成定制化需求

mvn clean install -Dmaven.test.skip -Denv=release

 

 

1.解壓

2.修改配置文件

canal.deployer-1.1.4/conf/example/instance.properties 配置文件

 

3.啟動

4.查看是否啟動成功 

canal.deployer-1.1.4/logs/canal

Cannal客戶端

1.創建一個demo項目

 

2.引入pom依賴

  <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.3</version>
    </dependency>

3.demo代碼

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author liqiang
 * @date 2020/1/13 16:21
 * @Description:
 */
@Component
public class CanalStart implements InitializingBean {
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("開始消費....");
        // 創建鏈接  canal的鏈接和地址 port默認是11111
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.3.17.72", 11111),
                "example", "", "");//或者example2
        int batchSize = 1000;//每個批次處理1000條
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");//訂閱所有庫下面的所有表
            //connector.subscribe("canal.t_canal");//訂閱庫canal庫下的表t_canal
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {//實際生產中需要設置為true,死循環
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                   // System.out.println("empty count : " + emptyCount);//代表沒有需要消費的數據
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾數據 
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());

            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

測試場景

 

 

簡單查詢

select * from `user`; 

並不會被消費到

執行刪除

整表刪除

message[batchId=7,size=3] 
rowChare ======>tableId: 33
eventType: DELETE
isDdl: false
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "liqiang"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
}
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "3"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "Tom"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "28"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test3@baomidou.com"
    mysqlType: "varchar(50)"
  }
}
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "4"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "Sandy"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "21"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test4@baomidou.com"
    mysqlType: "varchar(50)"
  }
}
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "5"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "Billie"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "24"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test5@baomidou.com"
    mysqlType: "varchar(50)"
  }
}

================> binlog[mysql-bin.000001:3000] , name[haoke,user] , eventType : DELETE #表示是刪除事件  
id : 2    update=false
name : liqiang    update=false
age : 20    update=false
email : test2@baomidou.com    update=false
id : 3    update=false
name : Tom    update=false
age : 28    update=false
email : test3@baomidou.com    update=false
id : 4    update=false
name : Sandy    update=false
age : 21    update=false
email : test4@baomidou.com    update=false
id : 5    update=false
name : Billie    update=false
age : 24    update=false
email : test5@baomidou.com    update=false

可以發現會消費5個事件

根據條件刪除

delete from`user` where id=2;

打印

message[batchId=10,size=3] 
rowChare ======>tableId: 116
eventType: DELETE
isDdl: false
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "liqiang"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
}

================> binlog[mysql-bin.000001:3551] , name[haoke,user] , eventType : DELETE
id : 2    update=false
name : liqiang    update=false
age : 20    update=false
email : test2@baomidou.com    update=false

修改

update `user` u set u.`name`='小明' where id=2
message[batchId=13,size=3] 
rowChare ======>tableId: 117
eventType: UPDATE
isDdl: false
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "liqiang"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
  afterColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  afterColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: true
    isNull: false
    value: "\345\260\217\346\230\216"
    mysqlType: "varchar(30)"
  }
  afterColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  afterColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
}

================> binlog[mysql-bin.000001:3989] , name[haoke,user] , eventType : UPDATE
-------> before
id : 2    update=false
name : liqiang    update=false
age : 20    update=false
email : test2@baomidou.com    update=false
-------> after
id : 2    update=false
name : 小明    update=true
age : 20    update=false
email : test2@baomidou.com    update=false

可以發現打印了修改前修改后數據 以及哪個字段被修改

新增

INSERT INTO `haoke`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '小明2', 20, 'test6@baomidou.com');
message[batchId=14,size=3] 
rowChare ======>tableId: 117
eventType: INSERT
isDdl: false
rowDatas {
  afterColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: true
    isNull: false
    value: "6"
    mysqlType: "bigint(20)"
  }
  afterColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: true
    isNull: false
    value: "\345\260\217\346\230\2162"
    mysqlType: "varchar(30)"
  }
  afterColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: true
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  afterColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: true
    isNull: false
    value: "test6@baomidou.com"
    mysqlType: "varchar(50)"
  }
}

================> binlog[mysql-bin.000001:4245] , name[haoke,user] , eventType : INSERT
id : 6    update=true
name : 小明2    update=true
age : 20    update=true
email : test6@baomidou.com    update=true

事件是新增

測試事物

未提交

mysql> START TRANSACTION;
Query OK, 0 rows affected (0.00 sec)
 
mysql> update `user` s set s.name='小張' where id=2;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
mysql> 

並沒有消費

提交

mysql> START TRANSACTION;
Query OK, 0 rows affected (0.00 sec)
 
mysql> update `user` s set s.name='小張' where id=2;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
mysql> commit;
Query OK, 0 rows affected (0.01 sec)

打印:

message[batchId=15,size=3] 
rowChare ======>tableId: 117
eventType: UPDATE
isDdl: false
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: false
    isNull: false
    value: "\345\260\217\346\230\216"
    mysqlType: "varchar(30)"
  }
  beforeColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
  afterColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "2"
    mysqlType: "bigint(20)"
  }
  afterColumns {
    index: 1
    sqlType: 12
    name: "name"
    isKey: false
    updated: true
    isNull: false
    value: "\345\260\217\345\274\240"
    mysqlType: "varchar(30)"
  }
  afterColumns {
    index: 2
    sqlType: 4
    name: "age"
    isKey: false
    updated: false
    isNull: false
    value: "20"
    mysqlType: "int(11)"
  }
  afterColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "test2@baomidou.com"
    mysqlType: "varchar(50)"
  }
}

================> binlog[mysql-bin.000001:4461] , name[haoke,user] , eventType : UPDATE
-------> before
id : 2    update=false
name : 小明    update=false
age : 20    update=false
email : test2@baomidou.com    update=false
-------> after
id : 2    update=false
name : 小張    update=true
age : 20    update=false
email : test2@baomidou.com    update=false

測試回滾

mysql> START TRANSACTION;
Query OK, 0 rows affected (0.00 sec)
 
mysql> update `user` s set s.name='小張2' where id=2;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
mysql> rollback;
Query OK, 0 rows affected (0.00 sec)

並沒有消費

Adapter

可以參考一下源碼實現client

 http://www.mamicode.com/info-detail-2851627.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM