使用canal獲取mysql的binlog傳輸給kafka,並交由logstash獲取實驗步驟


1. 實驗環境

CPU:4
內存:8G
ip:192.168.0.187

開啟iptables防火牆
關閉selinux
java >=1.5
使用yum方式安裝的java,提前配置好JAVA_HOME環境變量

vim /etc/profile.d/java.sh
	#!/bin/bash

	export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk # 路徑根據實際情況而定
	export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile.d/java.sh

2. MySQL信息

mysql賬號
root
MySQL密碼
liykpntuu9?C

操作

vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

service mysqld restart

登陸數據庫后操作

CREATE USER canal IDENTIFIED BY 'canal!%123AD';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
# 查詢MySQL數據庫中某一個用戶授予的權限
mysql> show grants for canal;
+---------------------------------------------------------------------------+
| Grants for canal@%                                                        |
+---------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' |
+---------------------------------------------------------------------------+

3. canal操作

# 下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal

# 修改連接數據庫的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
	## mysql serverId
	canal.instance.mysql.slaveId = 123
	#position info,需要改成自己的數據庫信息
	canal.instance.master.address = 127.0.0.1:3306 
	canal.instance.master.journal.name = 
	canal.instance.master.position = 
	canal.instance.master.timestamp = 
	#canal.instance.standby.address = 
	#canal.instance.standby.journal.name =
	#canal.instance.standby.position = 
	#canal.instance.standby.timestamp = 
	#username/password,需要改成自己的數據庫信息
	canal.instance.dbUsername = canal  
	canal.instance.dbPassword = canal!%123AD
	canal.instance.defaultDatabaseName =
	canal.instance.connectionCharset = UTF-8
	#table regex
	canal.instance.filter.regex = .\*\\\\..\*

# 啟動
bash bin/startup.sh

# 查看 server 日志
tail -n 30 logs/canal/canal.log
	2019-09-20 09:48:46.987 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
	2019-09-20 09:48:47.019 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
	2019-09-20 09:48:47.028 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
	2019-09-20 09:48:47.059 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
	2019-09-20 09:48:48.228 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

# 查看 instance 的日志
	2019-09-20 09:48:47.395 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
	2019-09-20 09:48:47.399 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
	2019-09-20 09:48:47.580 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
	2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
	2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
	2019-09-20 09:48:48.140 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
	2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
	2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
	2019-09-20 09:48:48.165 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
	2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
	2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
	2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump

# 關閉
bash bin/stop.sh

# 端口使用情況
ss -tlnp
State       Recv-Q Send-Q            Local Address:Port      Peer Address:Port              
LISTEN      0      50                   *:11110                  *:*                   users:(("java",pid=2078,fd=109))
LISTEN      0      50                   *:11111                  *:*                   users:(("java",pid=2078,fd=105))
LISTEN      0      3                    *:11112                  *:*                   users:(("java",pid=2078,fd=87))

# 端口號說明
# admin端口:11110
# tcp端口:11111
# metric端口:11112

# 端口號所在配置文件
# canal/conf/canal.properties
# canal.port = 11111
# canal.metrics.pull.port = 11112

# canal/conf/canal_local.properties
# canal.admin.port = 11110

組建HA集群
地址:https://github.com/alibaba/canal/wiki/AdminGuide

# 重新獲得一份解壓縮的目錄,比如canal_2
# 修改原有的canal數據
vim canal.properties
	canal.zkServers=localhost:2181,localhost:2182,localhost:2183,
	canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 然后把這個文件復制到canal_2對應目錄下,在canal_2對應目錄下再次修改
vim canal.properties
	canal.port = 11114
	canal.metrics.pull.port = 11115
vim canal_local.properties
	canal.admin.port = 11113

# 把canal example目錄下的instance.properties復制到canal_2對應目錄下,並在canal_2對應目錄下再次修改
vim instance.properties
	canal.instance.mysql.slaveId = 124 ##另外一台機器改成1235,保證slaveId不重復即可

# 注意: 兩台機器上的instance目錄的名字需要保證完全一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置

# 然后分別啟動

# 總結一下:
# 1. 倆配置文件中使用的三個端口號需要不一樣
# 2. 使用default-instance.xml文件
# 3. MySQL的slaveId不一樣
# 4. 兩台機器上的instance目錄的名字需要保證完全一致

# 啟動后,可以查看logs/example/example.log,只會看到一台機器上出現了啟動成功的日志。
# 或者通過查看一下zookeeper中的節點信息,可以知道當前工作的節點為localhost:11111
# get /otter/canal/destinations/example/running
# canal-admin 使用WEB UI界面查看管理canal

# canal-admin的限定依賴:
#    MySQL,用於存儲配置和節點等相關數據
#    canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態運維管理接口)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
tar -zxv -f canal-1.1.4/canal.admin-1.1.4.tar.gz -C /usr/local/src/canal_admin
vim conf/application.yml
	server:
	  port: 8089 # 端口號,防火牆放行該端口號
	spring:
	  jackson:
	    date-format: yyyy-MM-dd HH:mm:ss
	    time-zone: GMT+8

	spring.datasource:
	  address: 127.0.0.1:3306 # 數據庫地址和端口
	  database: canal_manager # 數據庫名
	  username: canal_admin   # 數據庫賬號 ,注意跟一開始創建的canal賬號區分開,需要修改一下
	  password: ABC123,.abc@#11  # 數據庫密碼
	  driver-class-name: com.mysql.jdbc.Driver
	  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
	  hikari:
	    maximum-pool-size: 30
	    minimum-idle: 1

	canal:
	  adminUser: admin   # 平台賬號
	  adminPasswd: admin # 平台密碼

# 注意,數據庫名,賬號和密碼需要提前創建好
# 若修改默認的數據庫名,則示例sql文件中也需要修改
# 這里只修改默認的數據庫賬號和密碼,其余保持默認

# 初始化元數據庫
# 初始化SQL腳本里會默認創建canal_manager的數據庫,建議使用root等有超級權限的賬號進行初始化 b. canal_manager.sql默認會在conf目錄下
mysql -hlocalhost -uroot -p
mysql> source /usr/local/canal_admin/conf/canal_manager.sql;

# 啟動
bash bin/startup.sh

# 查看 admin 日志
tail -n 30 logs/admin.log
	2019-09-20 14:50:54.595 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
	2019-09-20 14:50:54.624 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
	2019-09-20 14:50:54.812 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
	2019-09-20 14:50:54.818 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 11.057 seconds (JVM running for 12.731)

# 瀏覽器訪問,防火牆放行8089端口號
# 地址:http://192.168.0.187:8089/ 訪問,默認密碼:admin/123456 
# 登陸后可以修改密碼

# 用戶登陸密碼是使用mysql5方式加密的,直接保存到數據庫中的,canal_manager.sql文件中最后一條就是登陸賬號密碼

# 使用
# 創建一個集群,添加已有的canal
# 可以通過修改端口號的方式在一台主機上啟動多個canal組建集群
# 地址:https://github.com/alibaba/canal/wiki/AdminGuide

# 停止
bash bin/stop.sh

4. zookeeper

# 設置zookeeper集群
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
cd /usr/local/apache-zookeeper-3.5.5-bin

mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}

cp conf/zoo_sample.cfg conf/zoo-1.cfg
# vim conf/zoo-1.cfg
	dataDir=/zkdata/zookeeper-1
	clientPort=2181

	server.1=127.0.0.1:2888:3888
	server.2=127.0.0.1:2889:3889
	server.3=127.0.0.1:2890:3890

cp conf/zoo-1.cfg conf/zoo-2.cfg
cp conf/zoo-1.cfg conf/zoo-3.cfg

vim conf/zoo-2.cfg
	dataDir=/zkdata/zookeeper-2
	clientPort=2182

	server.1=127.0.0.1:2888:3888
	server.2=127.0.0.1:2889:3889
	server.3=127.0.0.1:2890:3890

vim conf/zoo-3.cfg
	dataDir=/zkdata/zookeeper-3
	clientPort=2183

	server.1=127.0.0.1:2888:3888
	server.2=127.0.0.1:2889:3889
	server.3=127.0.0.1:2890:3890

echo '1' > /zkdata/zookeeper-1/myid
echo '2' > /zkdata/zookeeper-2/myid 
echo '3' > /zkdata/zookeeper-3/myid 

# 修改啟動文件,避免后續出現如下錯誤
# stat is not executed because it is not in the whitelist.
# envi is not executed because it is not in the whitelist.

# nc命令需要安裝其他軟件
yum install nmap-ncat

# envi命令執行報錯提示:envi is not executed because it is not in the whitelist.
# 解決辦法 修改啟動指令 zkServer.sh ,往里面添加 :ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

	else
	    echo "JMX disabled by user request" >&2
	    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" # 注意找到這個信息
	fi

# 如果不想添加在這里,注意位置和賦值的順序
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

# 然后重啟zookeeper

# 集群啟動腳本
vim start.sh
	bash bin/zkServer.sh start conf/zoo-1.cfg
	bash bin/zkServer.sh start conf/zoo-2.cfg
	bash bin/zkServer.sh start conf/zoo-3.cfg

# 集群關閉腳本
vim start.sh
	bash bin/zkServer.sh stop conf/zoo-1.cfg
	bash bin/zkServer.sh stop conf/zoo-2.cfg
	bash bin/zkServer.sh stop conf/zoo-3.cfg

# 檢測集群狀態
[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-1.cfg
	/usr/bin/java
	ZooKeeper JMX enabled by default
	Using config: conf/zoo-1.cfg
	Client port found: 2181. Client address: localhost.
	Mode: follower

[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-2.cfg
	/usr/bin/java
	ZooKeeper JMX enabled by default
	Using config: conf/zoo-2.cfg
	Client port found: 2182. Client address: localhost.
	Mode: leader

[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-3.cfg
	/usr/bin/java
	ZooKeeper JMX enabled by default
	Using config: conf/zoo-3.cfg
	Client port found: 2183. Client address: localhost.
	Mode: follower
# 使用WEB UI查看監控集群-zk ui安裝
cd /usr/local

git clone https://github.com/DeemOpen/zkui.git

yum install -y maven

# 更換使用阿里雲maven源
vim /etc/maven/settings.xml 
	<mirrors>  

	    <mirror>
	        <id>nexus-aliyun</id>
	        <mirrorOf>central</mirrorOf>
	        <name>Nexus aliyun</name>
	        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
	    </mirror>

	</mirrors>

cd zkui/

mvn clean install

# 修改配置文件默認值
vim config.cfg
    serverPort=9090     #指定端口
    zkServer=localhost:2181,localhost:2182,localhost:2183 # 不使用127.0.0.1
    sessionTimeout=300

    # userSet中是登陸web界面的用戶名和密碼
	#管理員
	#admin:manager
	#用戶
	#appconfig:appconfig

# 啟動程序至后台
vim start.sh
	#!/bin/bash

	nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

# 瀏覽器訪問
# 防火牆放行9090端口,后期改用nginx代理
http://192.168.0.187:9090/

5. Kafka

# kafka集群,偽集群
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/config
mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
cp server.properties server-1.properties
vim server-1.properties
	broker.id=1
	delete.topic.enable=true
	listeners=PLAINTEXT://:9092
	advertised.listeners=PLAINTEXT://localhost:9092
	log.dirs=/kafkadata/kafka-1
	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-2.properties
vim server-2.properties
	broker.id=2
	delete.topic.enable=true
	listeners=PLAINTEXT://:9093
	log.dirs=/kafkadata/kafka-2
	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-3.properties
vim server-3.properties
	broker.id=3
	delete.topic.enable=true
	listeners=PLAINTEXT://:9094
	log.dirs=/kafkadata/kafka-3
	zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

# 啟動集群
vim start.sh
	#!/bin/bash

	bash bin/kafka-server-start.sh -daemon config/server-1.properties
	bash bin/kafka-server-start.sh -daemon config/server-2.properties
	bash bin/kafka-server-start.sh -daemon config/server-3.properties

# 停止集群
vim stop.sh 
	#!/bin/bash

	bash bin/kafka-server-stop.sh -daemon config/server-1.properties
	bash bin/kafka-server-stop.sh -daemon config/server-2.properties
	bash bin/kafka-server-stop.sh -daemon config/server-3.properties
# 監控kafka集群,開啟監控趨勢圖使用
# 有一個問題,需要在kafka-server-start.sh文件中配置端口,有如下三種辦法
# 第一種:復制並修改kafka目錄,比如kafka-1,kafka-2,kafka-3,然后再每個目錄下修改kafka-server-start.sh文件
# 第二種:在啟動腳本start.sh中添加指定端口
# 第三種:多復制幾個kafka-server-start.sh文件,然后進行修改,最后在start.sh中修改一下

# 以下三種方法任選其一即可

# 第一種方式辦法,相應行修改成如下形式,注意端口號不同
# 使用的是不同目錄下的不同kafka-server-start.sh文件
# start.sh文件也需要做相應的修改
# kafka-1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9997"
fi
# kafka-2/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9998"
fi
# kafka-3/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi

# start.sh
	#!/bin/bash
	bash kafka-1/bin/kafka-server-start.sh -daemon config/server-1.properties
	bash kafka-2/bin/kafka-server-start.sh -daemon config/server-2.properties
	bash kafka-3/bin/kafka-server-start.sh -daemon config/server-3.properties

# 第二種方法
# 使用的用一個目錄下的同一個文件,只是在每個命令前指定端口號
vim start.sh
	#!/bin/bash

	JMX_PORT=9997 bash bin/kafka-server-start.sh -daemon config/server-1.properties
	JMX_PORT=9998 bash bin/kafka-server-start.sh -daemon config/server-2.properties
	JMX_PORT=9999 bash bin/kafka-server-start.sh -daemon config/server-3.properties

# 第三種方法
# 使用的是同一個目錄下的不同kafka-server-start文件
# start.sh文件也需要做相應的修改
cp kafka-server-start.sh kafka-server-start-1.sh
cp kafka-server-start.sh kafka-server-start-2.sh
cp kafka-server-start.sh kafka-server-start-3.sh

vim kafka-server-start-1.sh
	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
	    export JMX_PORT="9997"
	fi
vim kafka-server-start-2.sh
	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
	    export JMX_PORT="9998"
	fi
vim kafka-server-start-3.sh
	if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
	   # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
	    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
	    export JMX_PORT="9999"
	fi

vim start.sh 
	#!/bin/bash

	bash bin/kafka-server-start-1.sh -daemon config/server-1.properties
	bash bin/kafka-server-start-2.sh -daemon config/server-2.properties
	bash bin/kafka-server-start-3.sh -daemon config/server-3.properties

vim stop.sh
	#!/bin/bash

	bash bin/kafka-server-stop.sh

cd /usr/local/src
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz

# 多次解壓縮后得到kafka-eagle-web-1.3.9目錄,然后把該目錄復制到/usr/local目錄下

cd /usr/local/kafka-eagle-web-1.3.9/conf
vim system-config.properties
	kafka.eagle.zk.cluster.alias=cluster1
	cluster1.zk.list=localhost:2181,localhost:2182,localhost:2183
	kafka.eagle.metrics.charts=true # 開啟監控趨勢圖,需要開啟Kafka系統的JMX端口,設置該端口在$KAFKA_HOME/bin/kafka-server-start.sh腳本中
	kafka.eagle.sql.fix.error=true # 開啟錯誤日志信息
	# 其余保持默認,數據庫使用sqlite,注意路徑需要事先創建好或修改成當前目錄
	# 數據庫也可以更換成MySQL
	kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle-web-1.3.9/db/ke.db

# 注意
# kafka.eagle.zk.cluster.alias的值需要跟下面的這個cluster1.zk.list小數點第一個保持一致,比如都是cluster1,否則獲取不到數據

# 添加環境變量
vim /etc/profile.d/kafka_eagle.sh
	#!/bin/bash

	export KE_HOME=/usr/local/kafka-eagle-web-1.3.9
	export PATH=$PATH:$KE_HOME/bin
source /etc/profile.d/kafka_eagle.sh


# 命令相關
bash bin/ke.sh start|stop|status|stats|restart

# 啟動
bash bin/ke.sh start
	*******************************************************************
	* Kafka Eagle system monitor port successful... 
	*******************************************************************
	[2019-09-20 12:10:32] INFO: Status Code[0]
	[2019-09-20 12:10:32] INFO: [Job done!]
	Welcome to
	    __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
	   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
	  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
	 / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
	/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
	                                                                                             

	Version 1.3.9
	*******************************************************************
	* Kafka Eagle Service has started success.
	* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
	* Account:admin ,Password:123456
	*******************************************************************
	* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
	* <Usage> https://www.kafka-eagle.org/ </Usage>
	*******************************************************************

# 瀏覽器訪問,防火牆放行該端口,后期改用Nginx代理
地址:192.168.0.187:8048/ke
賬號:admin,密碼:123456


# 在kafka eagle平台使用KSQL查詢數據

# 左側導航

# 1.先在Topic-List中查看到Topic Name和Partition Indexes的值
# ID	Topic Name	Partition Indexes	Partition Numbers	Created	Modify	Operate
# 1	canal_manager	[0]	1	2019-09-20 17:27:15	2019-09-20 17:27:15	

# 2. 在Topic-KSQL中輸入查詢語句
# select * from "canal_manager" where "partition" in (0) limit 1
# 下方會顯示查詢所用時間和查詢后的結果

# 在如下目錄的文件中可以查看具體的異常信息,一般都會提示具體的錯誤
# tail -n 30 kafka-eagle-web-1.3.9/logs/ke_console.out
# 配置文件詳細說明
######################################
# 配置多個Kafka集群所對應的Zookeeper
######################################
kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=dn1:2181,dn2:2181,dn3:2181
cluster2.zk.list=tdn1:2181,tdn2:2181,tdn3:2181

######################################
# 設置Zookeeper線程數
######################################
kafka.zk.limit.size=25

######################################
# 設置Kafka Eagle瀏覽器訪問端口
######################################
kafka.eagle.webui.port=8048

######################################
# 如果你的offsets存儲在Kafka中,這里就配置
# 屬性值為kafka,如果是在Zookeeper中,可以
# 注釋該屬性。一般情況下,Offsets的也和你消
# 費者API有關系,如果你使用的Kafka版本為0.10.x
# 以后的版本,但是,你的消費API使用的是0.8.2.x
# 時的API,此時消費者依然是在Zookeeper中
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# 如果你的集群一個是新版本(0.10.x以上),
# 一個是老版本(0.8或0.9),可以這樣設置,
# 如果都是新版本,那么可以將值都設置成kafka
######################################
cluster2.kafka.eagle.offset.storage=zookeeper

######################################
# 是否啟動監控圖表,默認是不啟動的
######################################
kafka.eagle.metrics.charts=false

######################################
# 在使用Kafka SQL查詢主題時,如果遇到錯誤,
# 可以嘗試開啟這個屬性,默認情況下,不開啟
######################################
kafka.eagle.sql.fix.error=false

######################################
# 郵件服務器設置,用來告警
######################################
kafka.eagle.mail.enable=false
kafka.eagle.mail.sa=
kafka.eagle.mail.username=
kafka.eagle.mail.password=
kafka.eagle.mail.server.host=
kafka.eagle.mail.server.port=

######################################
# 設置告警用戶,多個用戶以英文逗號分隔
######################################
kafka.eagle.alert.users=smartloli.org@gmail.com


######################################
# 超級管理員刪除主題的Token
######################################
kafka.eagle.topic.token=keadmin

######################################
# 如果啟動Kafka SASL協議,開啟該屬性
######################################
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN

######################################
# Kafka Eagle默認存儲在Sqlite中,如果要使用
# MySQL可以替換驅動、用戶名、密碼、連接地址
######################################
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=123456

kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/Users/dengjie/workspace/kafka-egale/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=root

6. 投遞數據到Kafka

# 先進行canal配置,改動配置文件canal.properties
# serverMode改為kafka
vim conf/canal.properties
	canal.serverMode = kafka
	canal.mq.servers = localhost:9092,localhost:9093,localhost:9094

vim conf/example/instance.propties
	# mq config
	canal.mq.topic=canal_manager # 填寫數據庫庫名,表示這個數據庫的所有表的操作都在這個topic下
	# dynamic topic route by schema or table regex
	# canal.mq.dynamicTopic=.*\\..*
	canal.mq.partition=0
	# hash partition config
	# canal.mq.partitionsNum=10
	# canal.mq.partitionHash=.*\\..*

# 以上具體規則詳看官方文檔

# kafka開啟消息隊列的自動創建topic模式,相關配置在kafka的server.properties
echo 'auto.create.topics.enable=true' >> config/server-1.properties 
echo 'auto.create.topics.enable=true' >> config/server-2.properties 
echo 'auto.create.topics.enable=true' >> config/server-3.properties 

# 相關改動完成后重啟canal和kafka

# 使用canal_admin平台查看canal的狀態
# Server管理,操作,日志

# 使用zu ui平台查看kafka的topic情況
# 左側導航Topic-List查看生成的topic,這里顯示的是canal_manager,上面設置的那個數據庫庫名
# 點開Topic Name可以查看具體的數據個數

# 使用命令行kafka-console-consumer.sh --topic canal_manager --bootstrap-server localhost:9092 --from-beginning查看canal傳遞給kafka的數據
# 插入一條數據
	{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972329000,"id":10,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972329456,"type":"INSERT"}
# 刪除一條數據
	{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972368000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972369005,"type":"DELETE"}
# 更新一條數據
	{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"1111","introduction":null,"avatar":null,"creation_date":"2019-09-20 23:52:09"}],"database":"canal_manager","es":1568994729000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":[{"roles":"admin","creation_date":"2019-07-14 00:05:28"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568994729999,"type":"UPDATE"}

# 或者在kafka eagle平台使用KSQL查詢數據,上述文檔已經說過了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM