數據抽取簡單的來說,就是將一個表的數據提取到另一個表中。有很多的ETL工具可以幫助我們來進行數據的抽取和轉換,ETL工具能進行一次性或者定時作業抽取數據,不過canal作為阿里巴巴提供的開源的數據抽取項目,能夠做到實時抽取,原理就是偽裝成mysql從節點,讀取mysql的binlog,生成消息,客戶端訂閱這些數據變更消息,處理並存儲。下面我們來一起搭建一下canal服務
配置mysql
mysql默認沒有開啟binlog,修改mysql的my.cnf文件,添加如下配置,注意binlog-format必須為row,因為binlog如果為
STATEMENT或者MIXED,則binlog中記錄的是sql語句,不是具體的數據行,canal就無法解析到具體的數據變更了
log-bin=E:/mysql5.5/bin_log/mysql-bin.log
binlog-format=ROW
server-id=123
canal配置
下載並安裝canal,在conf目錄可以看到canal.properties文件,修改該文件為
#設置要監聽的mysql服務器的地址和端口
canal.instance.master.address = 127.0.0.1:3306
#設置一個可訪問mysql的用戶名和密碼並具有相應的權限,本示例用戶名、密碼都為canal
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#連接的數據庫
canal.instance.defaultDatabaseName =test
#訂閱實例中所有的數據庫和表
canal.instance.filter.regex = .*\\..*
#連接canal的端口
canal.port= 11111
#監聽到的數據變更發送的隊列
canal.destinations= example
啟動 bin/startup.sh 或bin/startup.bat
客戶端代碼
添加canal依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.21</version> </dependency>
package com.cw.demo.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by chenwei01 on 2017/4/9. */ public class CanalClient { public static void main(String[] args) { while (true) { //連接canal CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal"); connector.connect(); //訂閱 監控的 數據庫.表 connector.subscribe("demo_db.user_tab"); //一次取5條 Message msg = connector.getWithoutAck(5); long batchId = msg.getId(); int size = msg.getEntries().size(); if (batchId < 0 || size == 0) { System.out.println("沒有消息,休眠5秒"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // CanalEntry.RowChange row = null; for (CanalEntry.Entry entry : msg.getEntries()) { try { row = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = row.getRowDatasList(); for (CanalEntry.RowData rowdata : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList(); Map<String, Object> dataMap = transforListToMap(afterColumnsList); if (row.getEventType() == CanalEntry.EventType.INSERT) { //具體業務操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.UPDATE) { //具體業務操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.DELETE) { List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if ("id".equals(column.getName())) { //具體業務操作 System.out.println("刪除的id:" + column.getValue()); } } } else { System.out.println("其他操作類型不做處理"); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } //確認消息 connector.ack(batchId); } } } public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) { Map map = new HashMap(); if (afterColumnsList != null && afterColumnsList.size() > 0) { for (CanalEntry.Column column : afterColumnsList) { map.put(column.getName(), column.getValue()); } } return map; } }