canal入門到實戰及面試


第一章、canal入門

一、什么是canal

canal是純Java開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL

img

 

 

如上圖:canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議

二、canal 搭建

1、搭建mysql環境
  • 對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
  • 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

這個第一步還是蠻簡單的,就是要自己搭建一個mysql,修改一下mysql的配置,這個配置一般是再/etc/my.cnf中,還是得要點小基礎的哈,至少mysql得會搭

2、搭建canal環境
  • 下載 canal, 訪問 release 頁面 , 選擇需要的包下載, 如以 1.0.17 版本為例

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
  • 解壓縮

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal

解壓完成后,進入 /tmp/canal 目錄,可以看到如下結構

img

 

  • 配置修改

vi conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=8

# enable gtid use true/false
canal.instance.gtidon=false

# position info 需要改成自己的數據庫信息
canal.instance.master.address=10.0.98.186:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password 需要改成自己的數據庫信息
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=expert-online-school
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#################################################

注意: canal.instance.connectionCharset 代表數據庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK , ISO-8859-1 如果系統是1個 cpu,需要將 canal.instance.parser.parallel 設置為 false

  • 啟動

sh bin/startup.sh

到目前為止 canal的服務端我們已經搭建好了 但是到目前 我們只是把數據庫的binlog 拉到canal中,我們還得把數據用otter去消費

三、寫個簡單的Demo 去監聽mysql 數據的變動

1、Jar包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>

四、測試代碼

package com.hq.eos.sync.client;

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;



public class CanalTest {

public static void main(String[] args) throws Exception {

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.0.98.186", 11111), "expert", "root", "root");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();

while (true) {
Message message = connector.getWithoutAck(100); // 獲取指定數量的數據
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據
// connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據
}
}

private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;

default:
break;
}
}
}
}

private static void printColumns(List<Column> columns) {
for(Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

測試結果

================> binlog[mysql-bin.000017:240485980] , name[xxl_job,xxl_job_registry] , eventType : UPDATE
UPDATE
id : 402 update=false
registry_group : EXECUTOR update=false
registry_key : hq-eos-crawler update=false
registry_value : 172.27.0.1:15674 update=false
update_time : 2019-12-03 17:54:42 update=true
================> binlog[mysql-bin.000017:240486374] , name[xxl_job,xxl_job_registry] , eventType : UPDATE
UPDATE
id : 82 update=false
registry_group : EXECUTOR update=false
registry_key : hq-eos-inf-config update=false
registry_value : 172.18.0.1:15672 update=false
update_time : 2019-12-03 17:54:42 update=true
================> binlog[mysql-bin.000017:240486774] , name[xxl_job,xxl_job_registry] , eventType : UPDATE

注意一下

 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.0.98.186", 11111), "expert", "root", "root");

這里的配置來自於 canal.properties 我把這個配置也貼出來吧

canal.id= 8
canal.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.zkServers=10.0.14.36:2181,10.0.14.39:2181,10.0.14.49:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=root
canal.instance.tsdb.dbPassword=root
# rds oss binlog account
canal.instance.rds.accesskey =
canal.instance.rds.secretkey =

#################################################
######### destinations #############
#################################################
canal.destinations= expert
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

第二章、canal實戰

一、canal連接kafka實現實時同步mysql數據

1、構建maven依賴
<dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.25</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.1.0</version>
</dependency>

注意版本對應

2、SimpleCanalClient(客戶端)
package com.unigroup.client.canal;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;

/**  

* @Title: SimpleCanalClient.java

* @Package com.unigroup.canal

* @Description: canal單實例接口

* @author 大碼王  

* @date 2019年12月29日 上午11:56:09

* @version V1.0  
 */
 public class SimpleCanalClient {

   private CanalConnector connector=null;

   public SimpleCanalClient(String ip,String port,String instance) {

       // 創建鏈接
       connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");

  }
   public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {

       //int batchSize = 1;
       int emptyCount = 0;
       Object obj = clazz.newInstance();
       Method method = clazz.getMethod("send",Message.class);
       try {
           connector.connect();
           // connector.subscribe(".*\\..*");
           connector.subscribe("test.test1");
     
           connector.rollback();
           int totalEmptyCount = 120;
           while (emptyCount < totalEmptyCount) {
               Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
               long batchId = message.getId();
               int size = message.getEntries().size();
               if (batchId == -1 || size == 0) {
                   emptyCount++;
                   System.out.println("empty count : " + emptyCount);
                   try {
                       Thread.sleep(1000);
                  } catch (InterruptedException e) {
                  }
              } else {
                   emptyCount = 0;
                   method.invoke(obj, message);            
              }
               connector.ack(batchId); // 提交確認
     
               // connector.rollback(batchId); // 處理失敗, 回滾數據
          }
     
           System.out.println("empty too many times, exit");
      } catch (IllegalAccessException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
      } catch (IllegalArgumentException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
      } catch (InvocationTargetException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
      } finally {
           connector.disconnect();
      }
       return null;

  }
}

 

3、CanalKafkaProducer(生產者)
package com.unigroup.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;

/**  

* @Title: CanalKafkaProducer.java

* @Package com.unigroup.kafka.producer

* @Description:

* @author 大碼王

* @date 2019年12月3日 上午11:53:35

* @version V1.0  
 */
 public class CanalKafkaProducer {

   private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

   private Producer<String, Message> producer;

   public void init(KafkaProperties kafkaProperties) {
       Properties properties = new Properties();
       properties.put("bootstrap.servers", kafkaProperties.getServers());
       properties.put("acks", "all");
       properties.put("retries", kafkaProperties.getRetries());
       properties.put("batch.size", kafkaProperties.getBatchSize());
       properties.put("linger.ms", kafkaProperties.getLingerMs());
       properties.put("buffer.memory", kafkaProperties.getBufferMemory());
       properties.put("key.serializer", StringSerializer.class.getName());
       properties.put("value.serializer", MessageSerializer.class.getName());
       producer = new KafkaProducer<String, Message>(properties);
  }

   public void stop() {
       try {
           logger.info("## stop the kafka producer");
           producer.close();
      } catch (Throwable e) {
           logger.warn("##something goes wrong when stopping kafka producer:", e);
      } finally {
           logger.info("## kafka producer is down.");
      }
  }

   public void send(Topic topic, Message message) throws IOException {

       ProducerRecord<String, Message> record;
       if (topic.getPartition() != null) {
           record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
      } else {
           record = new ProducerRecord<String, Message>(topic.getTopic(), message);
      }
       producer.send(record);
       if (logger.isDebugEnabled()) {
           logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
      }

  }
}
4、canalToKafkaServer(服務端)
package com.unigroup.kafka.server;

import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;

/**  

* @Title: canal.java
* @Package com.unigroup.kafka.server
* @Description:
* @author 大碼王  
* @date 2019年12月3日 上午11:23:35
* @version V1.0  
 */
 public class canalToKafkaServer {
   public static void execute() {
       SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
               GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
       try {
           simpleCanalClient.execute(1,CanalKafkaProducer.class);
      } catch (Exception e) {
           e.printStackTrace();
      }
  }
}

至此一個簡單的canal到kafka的demo已經完成。這些都只是測試代碼,實際應用中根據不同的情況,可以自己開發更多功能。

二、canal增量同步mysql數據庫信息到ElasticSearch

0、 運作原理

原理很簡單:

Canal模擬MySQL的slave的交互協議,偽裝成mysql slave,並將轉發協議發送到MySQL Master服務器。 MySQL Master接收到轉儲請求並開始將二進制日志推送到slave(即canal)。 Canal將二進制日志對象解析為自己的數據類型(原始字節流) 如圖所示: image

1、同步es

在同步數據到es的時候需要使用適配器:canal adapter。目前最新版本1.1.3,下載地址:https://github.com/alibaba/canal/releases

目前es貌似支持6.x版本,不支持7.x版本!!!

2、准備工作
2.1 es和jdk

安裝es如下:

2 安裝Elasticsearch

2.1 創建elasticsearch目錄

cd /usr/local/
mkdir tool
cd tool
mkdir elasticsearch
cd elasticsearch

2.1 下載Elasticsearch

2.1.1 在剛剛創建好的文件夾內下載Elasticsearch(以下簡稱es)

curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.2.tar.gz

2.1.2 解壓es包

tar -xvf elasticsearch-5.4.2.tar.gz

2.1.3 進入es/bin包

cd elasticsearch-5.4.2/bin

2.1.4 啟動es

./elasticsearch

2.2 解決問題

2.2.1 問題一

直接啟動,遇到如圖問題,如下:

img

這個問題是由於內存分配不夠造成的,修改適合本機的內存,修改文件config/jvm.options

vi ../config/jvm.options

修改如下位置

img

由於我的服務器內存較小,修改為512m,具體可以根據情況修改,如下:

img

修改后在次啟動:

./elasticsearch

2.2.2 問題二

出現如下錯誤:

org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root

如圖

img

這個問題很明顯,不允許使用root用戶啟動,那么我們新建一個es用戶,並賦予權限:

添加es用戶

useradd es

添加es用戶密碼

passwd es

將文件夾elasticsearch-5.4.2賦予es權限

chown -R es:es /usr/local/tool/elasticsearch/elasticsearch-5.4.2

切換為es用戶

su es

再次啟動es

./elasticsearch

這次啟動成功了,我們在使用一個窗口登錄root用戶,輸入命令:

curl -X GET http://localhost:9200

如圖所示,可以成功訪問

img

2.2.3 問題三

在瀏覽器訪問http://118.24.242.170:9200/拒絕訪問(118.24.242.170為服務器ip)

使用root用戶,打開elasticsearch.yml文件,如下:

vi /usr/local/tool/elasticsearch/elasticsearch-5.4.2/config/elasticsearch.yml

文件內增加如下代碼

network.host: 0.0.0.0

使用es用戶啟動,發現又出現了錯誤如下,得到錯誤信息如圖

img

使用root用戶打開如下文件:

vim /etc/sysctl.conf

添加如下配置:

vm.max_map_count = 655360

使配置生效

/sbin/sysctl -p

然后使用es用戶啟動Elasticsearch,這次可以成功啟動了,如果需要后台啟動的話,在啟動命令后加&,如下所示:

./elasticsearch &

安裝jdk如下

1.查看yum中管理的可用的JDK軟件包列表:

yum search java | grep -i --color JDK

結果如下圖所示:

image

2.選擇合適版本,安裝jdk,本人選擇的是java-1.8.0-openjdk-devel.x86_64

yum install java-1.8.0-openjdk-devel.x86_64

3配置環境變量,打開etc文件下profile

vi  /etc/profile

在文件內添加

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

保存關閉后,執行,讓配置生效:

source  /etc/profile

然后分別輸入下面命令確認jdk是否安裝成功:

java

image

javac

image

java -version

image

能顯示以上信息,就說明安裝成功了。

2.2 安裝canal server

下載canal.deployer-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

解壓文件

tar -zxvf canal.deployer-1.1.3.tar.gz

進入解壓后的文件夾

cd canal.deployer-1.1.3

修改conf/example/instance.properties文件,主要注意以下幾處:

canal.instance.master.address:數據庫地址,例如127.0.0.1:3306
canal.instance.dbUsername:數據庫用戶
canal.instance.dbPassword:數據庫密碼

完整內容如下:

#################################################

## mysql serverId , v1.0.26+ will autoGen

# canal.instance.mysql.slaveId=0

# enable gtid use true/false

canal.instance.gtidon=false

# position info

canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog

canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info

canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password

canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password

canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex

canal.instance.filter.regex=.*\\..*

# table black regex

canal.instance.filter.black.regex=

# mq config

#canal.mq.topic=example

# dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0

# hash partition config

#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

回到canal.deployer-1.1.3目錄下,啟動canal:

sh bin/startup.sh

查看日志:

vi logs/canal/canal.log

查看具體instance日志:

 vi logs/example/example.log

關閉命令

sh bin/stop.sh

 

2.3 安裝canal-adapter

下載canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz

解壓

tar -zxvf canal.adapter-1.1.3.tar.gz

進入解壓后的文件夾

cd canal.adapter-1.1.3

修改conf/application.yml文件,主要注意如下內容,由於是yml文件,注意我這里說明的屬性名稱:

server.port:canal-adapter端口號
canal.conf.canalServerHost:canal-server地址和ip
canal.conf.srcDataSources.defaultDS.url:數據庫地址
canal.conf.srcDataSources.defaultDS.username:數據庫用戶名
canal.conf.srcDataSources.defaultDS.password:數據庫密碼
canal.conf.canalAdapters.groups.outerAdapters.hosts:es主機地址,tcp端口

完整內容如下:

      • server:
        port: 8081
        spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null


        canal.conf:
        mode: tcp
        canalServerHost: 127.0.0.1:11111
        batchSize: 500
        syncBatchSize: 1000
        retries: 0
        timeout:
        accessKey:
        secretKey:
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
            username: root
            password: 12345678
        canalAdapters:

        - instance: example
          groups:
          - groupId: g1
            outerAdapters:
            - name: es
              hosts: 127.0.0.1:9300
              properties:
                cluster.name: elasticsearch

        另外需要配置conf/es/*.yml文件,adapter將會自動加載conf / es下的所有.yml結尾的配置文件。在介紹配置前,需要先介紹一下本案例使用的表結構,如下:

CREATE TABLE `test` (
 `id` int(11) NOT NULL,
 `name` varchar(200) NOT NULL,
 `address` varchar(1000) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

需要手動去es中創建索引,比如這里使用es-head創建,如下圖:

image

test索引結構如下:

{
   "mappings":{
       "_doc":{
           "properties":{
               "name":{
                   "type":"text"
              },
               "address":{
                   "type":"text"
              }
          }
      }
  }
}

接下來創建test.yml(文件名隨意),內容很好理解_

index為索引名稱,sql為對應語句,內容如下:

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
_index: test
_type: _doc
_id: _id
upsert: true
sql: "select a.id as _id,a.name,a.address from test a"
commitBatch: 3000

配置完成后,回到canal-adapter根目錄,執行命令啟動

bin/startup.sh

查看日志

vi logs/adapter/adapter.log

關閉canal-adapter命令

bin/stop.sh

3.測試 都啟動成功后,先查看一下es-head,如圖,現在是沒有任何數據的。

img

接下來,我們在數據庫中插入一條數據進行測試,語句如下:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝陽區');

然后在看一下es-head,如下

img

接下來看一下日志,如下:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝陽區"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} 
Affected indexes: test

小知識點:上面介紹的查看日志的方法可能不是很好用,推薦使用如下語法,比如查看日志最后200行:

tail -200f logs/adapter/adapter.log
3.總結

1.全量更新不能實現,但是增刪改都是可以的。 2.一定要提前創建好索引。 3.es配置的是tcp端口,比如默認的9300

在這里插入圖片描述

三、mysql+canal+kafka+elasticsearch構建數據查詢平台

1. 實驗環境

CPU:4 內存:8G ip:192.168.0.187

開啟iptables防火牆 關閉selinux java >=1.5 使用yum方式安裝的java,提前配置好JAVA_HOME環境變量

vim /etc/profile.d/java.sh
#!/bin/bash

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk # 路徑根據實際情況而定
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile.d/java.sh
2. MySQL信息

mysql賬號 root MySQL密碼 liykpntuu9?C

1、操作
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

service mysqld restart

登陸數據庫后操作

CREATE USER canal IDENTIFIED BY 'canal!%123AD';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3. canal操作
# 下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal

# 修改連接數據庫的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 123
#position info,需要改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的數據庫信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal!%123AD
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

# 啟動
bash bin/startup.sh

# 查看 server 日志
tail -n 30 logs/canal/canal.log
2019-09-20 09:48:46.987 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-09-20 09:48:47.019 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-09-20 09:48:47.028 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2019-09-20 09:48:47.059 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
2019-09-20 09:48:48.228 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

# 查看 instance 的日志
2019-09-20 09:48:47.395 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.399 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:47.580 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:48.140 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2019-09-20 09:48:48.165 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump

# 關閉
bash bin/stop.sh

# 端口使用情況
ss -tlnp
State       Recv-Q Send-Q           Local Address:Port     Peer Address:Port              
LISTEN      0      50                   *:11110                 *:*                   users:(("java",pid=2078,fd=109))
LISTEN      0      50                   *:11111                 *:*                   users:(("java",pid=2078,fd=105))
LISTEN      0      3                   *:11112                 *:*                   users:(("java",pid=2078,fd=87))

# 端口號說明
# admin端口:11110
# tcp端口:11111
# metric端口:11112
# canal-admin 使用WEB UI界面查看管理canal

# canal-admin的限定依賴:
#   MySQL,用於存儲配置和節點等相關數據
#   canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態運維管理接口)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
tar -zxv -f canal-1.1.4/canal.admin-1.1.4.tar.gz -C /usr/local/src/canal_admin
vim conf/application.yml
server:
port: 8089 # 端口號,防火牆放行該端口號
spring:
jackson:
  date-format: yyyy-MM-dd HH:mm:ss
  time-zone: GMT+8

spring.datasource:
address: 127.0.0.1:3306 # 數據庫地址和端口
database: canal_manager # 數據庫名
username: canal_admin   # 數據庫賬號 ,注意跟一開始創建的canal賬號區分開,需要修改一下
password: ABC123,.abc@#11 # 數據庫密碼
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
  maximum-pool-size: 30
  minimum-idle: 1

canal:
adminUser: admin   # 平台賬號
adminPasswd: admin # 平台密碼

# 注意,數據庫名,賬號和密碼需要提前創建好
# 若修改默認的數據庫名,則示例sql文件中也需要修改
# 這里只修改默認的數據庫賬號和密碼,其余保持默認

# 初始化元數據庫
# 初始化SQL腳本里會默認創建canal_manager的數據庫,建議使用root等有超級權限的賬號進行初始化 b. canal_manager.sql默認會在conf目錄下
mysql -hlocalhost -uroot -p
mysql> source /usr/local/canal_admin/conf/canal_manager.sql;

# 啟動
bash bin/startup.sh

# 查看 admin 日志
tail -n 30 logs/admin.log
2019-09-20 14:50:54.595 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2019-09-20 14:50:54.624 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2019-09-20 14:50:54.812 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2019-09-20 14:50:54.818 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 11.057 seconds (JVM running for 12.731)

# 瀏覽器訪問,防火牆放行8089端口號
# 地址:http://192.168.0.187:8089/ 訪問,默認密碼:admin/123456

# 使用
# 創建一個集群,添加已有的canal
# 因為端口的問題,暫時只能添加一個
# 另外canal是否可以組件集群,還有待研究

# 停止
bash bin/stop.sh
4. zookeeper
# 設置zookeeper集群
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
cd /usr/local/apache-zookeeper-3.5.5-bin

mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}

cp conf/zoo_sample.cfg conf/zoo-1.cfg
# vim conf/zoo-1.cfg
dataDir=/zkdata/zookeeper-1
clientPort=2181

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

cp conf/zoo-1.cfg conf/zoo-2.cfg
cp conf/zoo-1.cfg conf/zoo-3.cfg

vim conf/zoo-2.cfg
dataDir=/zkdata/zookeeper-2
clientPort=2182

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

vim conf/zoo-3.cfg
dataDir=/zkdata/zookeeper-3
clientPort=2183

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

echo '1' > /zkdata/zookeeper-1/myid
echo '2' > /zkdata/zookeeper-2/myid
echo '3' > /zkdata/zookeeper-3/myid

# 修改啟動文件,避免后續出現如下錯誤
# stat is not executed because it is not in the whitelist.
# envi is not executed because it is not in the whitelist.

# nc命令需要安裝其他軟件
yum install nmap-ncat

# envi命令執行報錯提示:envi is not executed because it is not in the whitelist.
# 解決辦法 修改啟動指令 zkServer.sh ,往里面添加 :ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

else
   echo "JMX disabled by user request" >&2
   ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" # 注意找到這個信息
fi

# 如果不想添加在這里,注意位置和賦值的順序
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

# 然后重啟zookeeper

# 集群啟動腳本
vim start.sh
bash bin/zkServer.sh start conf/zoo-1.cfg
bash bin/zkServer.sh start conf/zoo-2.cfg
bash bin/zkServer.sh start conf/zoo-3.cfg

# 集群關閉腳本
vim start.sh
bash bin/zkServer.sh stop conf/zoo-1.cfg
bash bin/zkServer.sh stop conf/zoo-2.cfg
bash bin/zkServer.sh stop conf/zoo-3.cfg

# 檢測集群狀態
[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-1.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-1.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-2.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-2.cfg
Client port found: 2182. Client address: localhost.
Mode: leader

[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-3.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-3.cfg
Client port found: 2183. Client address: localhost.
Mode: follower
# 使用WEB UI查看監控集群-zk ui安裝
cd /usr/local

git clone https://github.com/DeemOpen/zkui.git

yum install -y maven

# 更換使用阿里雲maven源
vim /etc/maven/settings.xml
<mirrors>  

  <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
  </mirror>

</mirrors>

cd zkui/

mvn clean install

# 修改配置文件默認值
vim config.cfg
   serverPort=9090     #指定端口
   zkServer=localhost:2181,localhost:2182,localhost:2183 # 不使用127.0.0.1
   sessionTimeout=300

   # userSet中是登陸web界面的用戶名和密碼
#管理員
#admin:manager
#用戶
#appconfig:appconfig

# 啟動程序至后台
vim start.sh
#!/bin/bash

nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

# 瀏覽器訪問
# 防火牆放行9090端口,后期改用nginx代理
http://192.168.0.187:9090/
5. Kafka
# kafka集群,偽集群
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/config
mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
cp server.properties server-1.properties
vim server-1.properties
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/kafkadata/kafka-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-2.properties
vim server-2.properties
broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://:9093
log.dirs=/kafkadata/kafka-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-3.properties
vim server-3.properties
broker.id=3
delete.topic.enable=true
listeners=PLAINTEXT://:9094
log.dirs=/kafkadata/kafka-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

# 啟動集群
vim start.sh
#!/bin/bash

bash bin/kafka-server-start.sh -daemon config/server-1.properties
bash bin/kafka-server-start.sh -daemon config/server-2.properties
bash bin/kafka-server-start.sh -daemon config/server-3.properties

# 停止集群
vim stop.sh
#!/bin/bash

bash bin/kafka-server-stop.sh -daemon config/server-1.properties
bash bin/kafka-server-stop.sh -daemon config/server-2.properties
bash bin/kafka-server-stop.sh -daemon config/server-3.properties
# 監控kafka集群
# 有一個問題,需要在kafka-server-start.sh文件中配置端口,有如下三種辦法
# 第一種:復制並修改kafka目錄,比如kafka-1,kafka-2,kafka-3,然后再每個目錄下修改kafka-server-start.sh文件
# 第二種:在啟動腳本start.sh中添加指定端口
# 第三種:多復制幾個kafka-server-start.sh文件,然后進行修改,最后在start.sh中修改一下

# 以下三種方法任選其一即可

# 第一種方式辦法,相應行修改成如下形式,注意端口號不同
# 使用的是不同目錄下的不同kafka-server-start.sh文件
# start.sh文件也需要做相應的修改
# kafka-1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9997"
fi
# kafka-2/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9998"
fi
# kafka-3/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9999"
fi

# start.sh
#!/bin/bash
bash kafka-1/bin/kafka-server-start.sh -daemon config/server-1.properties
bash kafka-2/bin/kafka-server-start.sh -daemon config/server-2.properties
bash kafka-3/bin/kafka-server-start.sh -daemon config/server-3.properties

# 第二種方法
# 使用的用一個目錄下的同一個文件,只是在每個命令前指定端口號
vim start.sh
#!/bin/bash

JMX_PORT=9997 bash bin/kafka-server-start.sh -daemon config/server-1.properties
JMX_PORT=9998 bash bin/kafka-server-start.sh -daemon config/server-2.properties
JMX_PORT=9999 bash bin/kafka-server-start.sh -daemon config/server-3.properties

# 第三種方法
# 使用的是同一個目錄下的不同kafka-server-start文件
# start.sh文件也需要做相應的修改
cp kafka-server-start.sh kafka-server-start-1.sh
cp kafka-server-start.sh kafka-server-start-2.sh
cp kafka-server-start.sh kafka-server-start-3.sh

vim kafka-server-start-1.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9997"
fi
vim kafka-server-start-2.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9998"
fi
vim kafka-server-start-3.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9999"
fi

vim start.sh
#!/bin/bash

bash bin/kafka-server-start-1.sh -daemon config/server-1.properties
bash bin/kafka-server-start-2.sh -daemon config/server-2.properties
bash bin/kafka-server-start-3.sh -daemon config/server-3.properties
cd /usr/local/src
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz

# 多次解壓縮后得到kafka-eagle-web-1.3.9目錄,然后把該目錄復制到/usr/local目錄下

cd /usr/local/kafka-eagle-web-1.3.9/conf
vim system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181,localhost:2182,localhost:2183
kafka.eagle.metrics.charts=true
# 其余保持默認,數據庫使用sqlite,注意路徑需要事先創建好或修改成當前目錄
# 數據庫也可以更換成MySQL
kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle-web-1.3.9/db/ke.db

# 注意
# kafka.eagle.zk.cluster.alias的值需要跟下面的這個cluster1.zk.list小數點第一個保持一致,比如都是cluster1,否則獲取不到數據

# 添加環境變量
vim /etc/profile.d/kafka_eagle.sh
#!/bin/bash

export KE_HOME=/usr/local/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin
source /etc/profile.d/kafka_eagle.sh


# 命令相關
bash bin/ke.sh start|stop|status|stats|restart

# 啟動
bash bin/ke.sh start
*******************************************************************
* Kafka Eagle system monitor port successful...
*******************************************************************
[2019-09-20 12:10:32] INFO: Status Code[0]
[2019-09-20 12:10:32] INFO: [Job done!]
Welcome to
  __ __   ___     ____   __ __   ___           ______   ___   ______   __     ______
  / //_/   /   |   / __/   / //_/   /   |         / ____/   /   | / ____/   / /   / ____/
/ ,<     / /| | / /_   / ,<     / /| |         / __/     / /| | / / __   / /   / __/  
/ /| |   / ___ | / __/   / /| |   / ___ |       / /___   / ___ |/ /_/ /   / /___ / /___  
/_/ |_| /_/ |_|/_/     /_/ |_| /_/ |_|       /_____/   /_/ |_|\____/   /_____//_____/  
                                                                                           

Version 1.3.9
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

# 瀏覽器訪問,防火牆放行該端口,后期改用Nginx代理
地址:192.168.0.187:8048/ke
賬號:admin,密碼:123456
6. 投遞數據到Kafka
# 先進行canal配置,改動配置文件canal.properties
# serverMode改為kafka
vim conf/canal.properties
canal.serverMode = kafka
canal.mq.servers = localhost:9092,localhost:9093,localhost:9094

vim conf/example/instance.propties
# mq config
canal.mq.topic=canal_manager # 填寫數據庫庫名,表示這個數據庫的所有表的操作都在這個topic下
# dynamic topic route by schema or table regex
# canal.mq.dynamicTopic=.*\\..*
canal.mq.partition=0
# hash partition config
# canal.mq.partitionsNum=10
# canal.mq.partitionHash=.*\\..*

# 以上具體規則詳看官方文檔

# kafka開啟消息隊列的自動創建topic模式,相關配置在kafka的server.properties
echo 'auto.create.topics.enable=true' >> config/server-1.properties
echo 'auto.create.topics.enable=true' >> config/server-2.properties
echo 'auto.create.topics.enable=true' >> config/server-3.properties

# 相關改動完成后重啟canal和kafka

# 使用canal_admin平台查看canal的狀態
# Server管理,操作,日志

# 使用zu ui平台查看kafka的topic情況
# 左側導航Topic-List查看生成的topic,這里顯示的是canal_manager,上面設置的那個數據庫庫名
# 點開Topic Name可以查看具體的數據個數

# 使用命令行kafka-console-consumer.sh --topic canal_manager --bootstrap-server localhost:9092 --from-beginning查看canal傳遞給kafka的數據
# 插入一條數據
{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972329000,"id":10,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972329456,"type":"INSERT"}
# 刪除一條數據
{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972368000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972369005,"type":"DELETE"}

后續增加使用logstash從Kafka中拉取數據傳輸到elastic中且指定索引

第三章、canal面試

一、canal日志復制

  1. master將改變記錄到二進制日志(binary log)中;

  2. slave將master的binary log events拷貝到它的中繼日志(relay log);

  3. slave重做中繼日志中的事件,將改變反映它自己的數據。

img

  1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議

  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)

  3. canal解析binary log對象(原始為byte流)

img

基於日志增量訂閱&消費支持的業務:

  1. 數據庫鏡像

  2. 數據庫實時備份

  3. 多級索引 (賣家和買家各自分庫索引)

  4. search build

  5. 業務cache刷新

  6. 價格變化等重要業務消息

二、canal介紹

名稱:canal [kə'næl]

譯意: 水道/管道/溝渠

語言: 純java開發

定位: 基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了mysql

關鍵詞: mysql binlog parser / real-time / queue&topic

三、工作原理

1、mysql主備復制實現

img

從上層來看,復制分成三步:

  1. master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);

  2. slave將master的binary log events拷貝到它的中繼日志(relay log);

  3. slave重做中繼日志中的事件,將改變反映它自己的數據。

2、canal的工作原理:

img

原理相對比較簡單:

  1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議

  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)

  3. canal解析binary log對象(原始為byte流)

四、架構

img

說明:

  • server代表一個canal運行實例,對應於一個jvm

  • instance對應於一個數據隊列 (1個server對應1..n個instance)

instance模塊:

  • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)

  • eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)

  • eventStore (數據存儲)

  • metaManager (增量訂閱&消費信息管理器)

五、知識科普

mysql的Binlay Log介紹

簡單點說:

  • mysql的binlog是多文件存儲,定位一個LogEvent需要通過binlog filename + binlog position,進行定位

  • mysql的binlog數據格式,按照生成的方式,主要分為:statement-based、row-based、mixed。

 

mysql> show variables like 'binlog_format';
   +---------------+-------+
   | Variable_name | Value |
   +---------------+-------+
   | binlog_format | ROW   |
   +---------------+-------+
   1 row in set (0.00 sec)

目前canal支持所有模式的增量訂閱(但配合同步時,因為statement只有sql,沒有數據,無法獲取原始的變更日志,所以一般建議為ROW模式)

六、EventParser設計

大致過程:

img

整個parser過程大致可分為幾步:

  1. Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前數據庫的binlog位點)

  2. Connection建立鏈接,發送BINLOG_DUMP指令 // 0. write command number // 1. write 4 bytes bin-log position to start at // 2. write 2 bytes bin-log flags // 3. write 4 bytes server id of the slave // 4. write bin-log file name

  3. Mysql開始推送Binaly Log

  4. 接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定信息 // 補充字段名字,字段類型,主鍵信息,unsigned類型處理

  5. 傳遞給EventSink模塊進行數據存儲,是一個阻塞操作,直到存儲成功

  6. 存儲成功后,定時記錄Binaly Log位置

七、EventSink設計

img

說明:

  • 數據過濾:支持通配符的過濾模式,表名,字段內容等

  • 數據路由/分發:解決1:n (1個parser對應多個store的模式)

  • 數據歸並:解決n:1 (多個parser對應1個store)

  • 數據加工:在進入store之前進行額外的處理,比如join

數據1:n業務

為了合理的利用數據庫資源, 一般常見的業務都是按照schema進行隔離,然后在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是通過cobar/tddl來解決數據源路由問題。

所以,一般一個數據庫實例上,會部署多個schema,每個schema會有由1個或者多個業務方關注

數據n:1業務

同樣,當一個業務的數據規模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據需要處理時,就需要鏈接多個store進行處理,消費的位點就會變成多份,而且數據消費的進度無法得到盡可能有序的保證。

所以,在一定業務場景下,需要將拆分后的增量數據進行歸並處理,比如按照時間戳/全局id進行排序歸並.

八、EventStore設計

  • \1. 目前僅實現了Memory內存模式,后續計划增加本地file存儲,mixed混合模式

  • \2. 借鑒了Disruptor的RingBuffer的實現思路

RingBuffer設計:

img

定義了3個cursor

  • Put : Sink模塊進行數據存儲的最后一次寫入位置

  • Get : 數據訂閱獲取的最后一次提取位置

  • Ack : 數據消費成功的最后一次消費位置

借鑒Disruptor的RingBuffer的實現,將RingBuffer拉直來看:

 

img

image.png

實現說明:

  • Put/Get/Ack cursor用於遞增,采用long型存儲

  • buffer的get操作,通過取余或者與操作。(與操作: cusor & (size - 1) , size需要為2的指數,效率比較高)

九、Instance設計

img

instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。

抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

  • manager方式: 和你自己的內部web console/manager系統進行對接。(目前主要是公司內部使用)

  • spring方式:基於spring xml + properties進行定義,構建spring配置.

十、Server設計

img

server代表了一個canal的運行實例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現

  • Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)

  • Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數據量大的時候會有一些問題)

十一、增量訂閱/消費設計

img

具體的協議格式,可參見:CanalProtocol.proto

get/ack/rollback協議介紹:

  • Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內容為: a. batch id 唯一標識 b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto

  • void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數據。基於get獲取的batchId進行提交,避免誤操作

  • void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操作

canal的get/ack/rollback協議和常規的jms協議有所不同,允許get/ack異步處理,比如可以連續調用get多次,后續異步按順序提交ack/rollback,項目中稱之為流式api.

流式api設計的好處:

  • get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別情況,沒必要為個別的case犧牲整個性能)

  • get獲取數據后,業務消費存在瓶頸或者需要多進程/多線程消費時,可以不停的輪詢get數據,不停的往后發送任務,提高並行化. (作者在實際業務中的一個case:業務數據消費需要跨中美網絡,所以一次操作基本在200ms以上,為了減少延遲,所以需要實施並行化)

流式api設計:

img

  • 每次get操作都會在meta中產生一個mark,mark標記會遞增,保證運行過程中mark的唯一性

  • 每次的get操作,都會在上一次的mark操作記錄的cursor繼續往后取,如果mark不存在,則在last ack cursor繼續往后取

  • 進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新為last ack cusor

  • 一旦出現異常情況,客戶端可發起rollback情況,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往后取

十二、數據對象格式:EntryProtocol.proto

 

Entry
   Header
       logfileName [binlog文件名]
       logfileOffset [binlog position]
       executeTime [binlog里記錄變更發生的時間戳]
       schemaName [數據庫實例]
       tableName [表名]
       eventType [insert/update/delete類型]
   entryType   [事務頭BEGIN/事務尾END/數據ROWDATA]
   storeValue [byte數據,可展開,對應的類型為RowChange]

RowChange
isDdl       [是否是ddl變更操作,比如create table/drop table]
sql     [具體的ddl sql]
rowDatas   [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
beforeColumns [Column類型的數組]
afterColumns [Column類型的數組]

Column
index       [column序號]
sqlType     [jdbc type]
name       [column name]
isKey       [是否為主鍵]
updated     [是否發生過變更]
isNull     [值是否為null]
value       [具體的內容,注意為文本]

說明:

  • 可以提供數據庫變更前和變更后的字段內容,針對binlog中沒有的name,isKey等信息進行補全

  • 可以提供ddl的變更語句

十三、HA機制設計

canal的ha分為兩部分,canal server和canal client分別有對應的ha實現

  • canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.

  • canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。

Canal Server:

img

大致步驟:

  1. canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:創建EPHEMERAL節點,誰創建成功就允許誰啟動)

  2. 創建zookeeper節點成功后,對應的canal server就啟動對應的canal instance,沒有創建成功的canal instance就會處於standby狀態

  3. 一旦zookeeper發現canal server A創建的節點消失后,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.

  4. canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試connect.

Canal Client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節點的方式進行控制.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM