版本使用
软件 | 版本 |
---|---|
MySQL | 8.0.18 |
canal | 1.1.5 |
canal-adapter | 1.1.5 |
Elasticsearch | 7.4.0 |
Kibana | 7.4.0 |
JDK | 1.8.0_181 |
基础配置
安装宝塔面板
偷懒了,所以直接安装宝塔面板
Centos安装命令:
yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh
登录后安装LNMP套件:
安装jdk 8
下载
wget https://mirrors.huaweicloud.com/java/jdk/8u181-b13/jdk-8u181-linux-x64.rpm
安装
rpm -ivh jdk-8u181-linux-x64.rpm
检查是否安装成功
[root@VM-0-9-centos ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
安装docker
软件商店中找到docker管理器,点击安装
配置docker国内镜像
创建并修改/etc/docker/daemon.json文件
touch /etc/docker/daemon.json
vim /etc/docker/daemon.json
内容如下
{
"registry-mirrors": [
"https://registry.docker-cn.com",
"http://hub-mirror.c.163.com",
"https://docker.mirrors.ustc.edu.cn"
]
}
也可以直接在宝塔面板中配置
安装ElasticSearch
从docker下载es镜像
最后实际上用的是7.4.0,懒得改图了
docker pull elasticsearch:7.4.0
检查镜像
配置
创建配置文件elasticsearch.yml
cluster.name: "my-aliyun-cluster"
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
xpack.security.enabled: false
启动
sudo docker run -d --name es7 -p 9200:9200 -p 9300:9300 -v /tmp/mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms256m -Xmx256m" elasticsearch:7.4.0
可以看到docker已经启动了
9200端口可以被访问到
安装Kibana
从docker下载kibana镜像
docker pull kibana:7.4.0
配置kibana.yml
注意,这里的ip地址需要
docker inspect
查看,用这里看到的那个
server.name: kibana
server.host: 0.0.0.0
elasticsearch.hosts: [ "http://内网ip:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"
启动
sudo docker run --name kibana6 -d -p 5601:5601 -v /tmp/mydata/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.9.0
可以正常访问kibana
配置canal
仓库地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
准备
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
- my.cnf在/etc/my.cnf
- 设置完之后重启mysql服务,可以无脑在宝塔面板点一下
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
后来直接使用的阿里云的RDS,所以这一步我没有做
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
下载
-
下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.15 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
-
解压缩
mkdir /tmp/canal tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal
-
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
[root@VM-0-9-centos canal]# ll total 20 drwxr-xr-x 2 root root 4096 Dec 26 15:03 bin drwxr-xr-x 5 root root 4096 Dec 26 15:03 conf drwxr-xr-x 2 root root 4096 Dec 26 15:03 lib drwxrwxrwx 2 root root 4096 Aug 22 13:17 logs drwxrwxrwx 2 root root 4096 Aug 22 13:17 plugin
-
修改配置
创建测试数据库
CREATE TABLE user (
`id` int(10) NOT NULL,
`name` varchar(100) DEFAULT NULL,
`role_id` int(10) NOT NULL,
`c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
修改配置
修改配置 conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=数据库连接地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=数据库用户名
canal.instance.dbPassword=数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
canal.properties貌似也要改一下,是一个玄学问题
需要把这一行的注释去掉
canal.instance.parser.parallelThreadSize = 16
启动
[root@VM-0-9-centos bin]# sh ./startup.sh
配置canal-adapter
准备
添加student以及映射关系
PUT /canal_user
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
解压adapter压缩包,
mkdir /tmp/canal-adapter
tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal-adapter
配置application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://mysql所在服务器外网ip:3306/canal?useUnicode=true
username: aliyun_user
password: Huigu12345!!
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es
hosts: es所在服务器IP:9300
properties:
cluster.name: es的cluster名字
adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件
适配器表映射文件
创建并修改 conf/es/canal_user.yml文件:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: canal_user
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
测试
建表以及sql语句来源:https://segmentfault.com/a/1190000019066098?utm_source=tag-newest
插入数据
尝试在数据库中插入一条数据:
可以在日志中找到了:
在kibana中查询,可以得到这条结果:
GET /canal_user/_doc/7
更新数据
尝试更新一条数据
UPDATE user
SET name = 'zhengguo'
WHERE id = 7;
日志中可以看到
再次在kibana查询,可以看到已经变化了
GET /canal_user/_doc/7
删除数据
尝试删除一条数据
DELETE FROM user
WHERE id = 7;
日志中可以看到
再次在kibana查询,可以无法查询到这条记录了