工作原理
canal-php 是 Canal 的 php 客戶端,它與 Canal 是采用的Socket來進行通信的,傳輸協議是TCP,交互協議采用的是 Google Protocol Buffer 3.0。
工作流程
1、Canal連接到mysql數據庫,模擬slave
2、canal-php 與 Canal 建立連接
3、數據庫發生變更寫入到binlog
4、Canal向數據庫發送dump請求,獲取binlog並解析
5、canal-php 向 Canal 請求數據庫變更
6、Canal 發送解析后的數據給canal-php
7、canal-php收到數據,消費成功,發送回執。(可選)
8、Canal記錄消費位置。
構建canal php客戶端
$ composer require xingwenge/canal_php or $ git clone https://github.com/xingwenge/canal-php.git $ cd canal-php $ composer update
建立與Canal的連接
try { #客戶端連接方式 $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect("127.0.0.1", 11111); # 對應 canal.properties的配置 $client->checkValid(); $client->subscribe("1001", "example", ".*\\..*"); #此處1001不需要修改,example 是在canal配置文件里配置的名稱 # $client->subscribe("1001", "example", "db_name.tb_name"); # 設置過濾,多個數據庫用逗號隔開
# $client->subscribe("1001", "example", "db_name.tb_name_[0-9]"); # 可以批量設置分表表名 while (true) { $message = $client->get(100); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { #此處可以處理邏輯 Fmt::println($entry); # 返回的是一條SQL語句受影響的所有數據行 } } sleep(1); } $client->disConnect(); } catch (\Exception $e) { echo $e->getMessage(), PHP_EOL; }
運行
php canal-php/src/sample/client.php