目录
logstash同步Mysql数据到Es步骤
1.运行依赖环境
- logstash需要依赖JDK环境,需要提前安装好JDK环境
2.安装logstash
2.1上传logstash压缩包,并解压和改名
cd /usr/local
tar -zxvf logstash-6.8.9.tar.gz
mv logstash-6.8.9 logstash
2.2安装mysql和es插件
cd /usr/local/logstash/bin
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
2.3上传mysql的jar包,提供依赖
mkdir -p /usr/local/logstash/jar
cd /usr/local/logstash/jar
2.4 创建配置文件,配置数据库需要收集的表信息和es信息
mkdir -p /usr/local/logstash/myConfig
cd /usr/local/logstash/config/myConfig
touch mysql.conf
vi mysql.conf
mysql.conf
-------------
input {
jdbc {
jdbc_driver_library => "/usr/local/logstash/jar/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.58.222:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
statement => "SELECT * FROM user WHERE updated_time >= :sql_last_value"
# 一定要设置timezone,否则会因为时差相差8小时,导致一直同步
jdbc_default_timezone =>"Asia/Shanghai"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updated_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口,集群就写所有的es服务器
hosts => ["192.168.58.222:9200","192.168.58.222:9201"]
# 索引名称 可自定义
index => "user"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "user"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
2.5 指定配置文件,启动logstash(前台启动并跟踪日志)
/usr/local/logstash/bin/logstash -f /usr/local/logstash/myConfig/mysql.conf
2.6 多文件方式同步ES数据
一个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置文件就可以了,假设我们有两个表 table1 和 table2,对应两个配置文件 mysql.conf 和 mysql_3.conf
在 config/pipelines.yml 中添加配置,添加配置后,启动logstash时就不用指定配置文件了
cd /usr/local/logstash/config
vi pipelines.yml
pipelines.yml
--------------------
- pipeline.id: table1
path.config: "/usr/local/logstash/myConfig/mysql.conf"
- pipeline.id: table2
path.config: "/usr/local/logstash/myConfig/mysql_3.conf"
mysql_3.conf
------------
input {
jdbc {
jdbc_driver_library => "/usr/local/logstash/jar/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.58.222:3306/meite_goods?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
statement => "SELECT * FROM meite_product WHERE updated_time >= :sql_last_value"
jdbc_default_timezone =>"Asia/Shanghai"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updated_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.58.222:9200","192.168.58.222:9201"]
# 索引名称 可自定义
index => "goods"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "goods"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
2.7 logstash 自定义的配置文件说明
jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载
jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
jdbc_connection_string: mysql 地址
jdbc_user: mysql 用户
jdbc_password: mysql 密码
schedule: 执行 sql 时机,类似 crontab 的调度
statement: 要执行的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量
use_column_value: 使用递增列的值
tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
schedule => "* * * * *"
这里schedule是Crontab,工具网址:https://tool.lu/crontab/ 注意:Crontab表达式以分为单位