關於logstash安裝:https://www.cnblogs.com/toov5/p/10301727.html
Logstash是一個開源數據收集引擎,具有實時管道功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據標准化到你所選擇的目的地
下面進一步詳細說配置:
jdbc_driver_library: jdbc mysql 驅動的路徑,在上一步中已經下載 jdbc_driver_class: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了 jdbc_connection_string: mysql 地址 jdbc_user: mysql 用戶 jdbc_password: mysql 密碼 schedule: 執行 sql 時機,類似 crontab 的調度 statement: 要執行的 sql,以 “:” 開頭是定義的變量,可以通過 parameters 來設置變量,這里的 sql_last_value 是內置的變量,表示上一次 sql 執行中 update_time 的值,這里 update_time 條件是 >= 因為時間有可能相等,沒有等號可能會漏掉一些增量 use_column_value: 使用遞增列的值 tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型 tracking_column: 遞增字段的名稱,這里使用 update_time 這一列,這列的類型是 timestamp last_run_metadata_path: 同步點文件,這個文件記錄了上次的同步點,重啟時會讀取這個文件,這個文件可以手動修改
注意:
Crontab:官網 https://tool.lu/crontab/ 注意:Crontab表達式以分為單位
./bin/logstash -f mysql.conf 啟動
原理:
Logstash --> 發送查詢語句到MySQL,
Logstash -> 發送查詢結果到ES
首次查詢全部數據(根據時間1970年),記錄最后一此數update_time,作為下一次修改時間查詢的條件值。數據庫新增或者修改、刪除的時候都會記錄時間。
where update_time >= xxxx-xx-xx
每隔一段時間查詢一次。表里面必須有:update_time 字段
同步方式:
1. 主鍵的新增方式
2. update_time方式
比較:
使用 logstash-input-jdbc 插件讀取 mysql 的數據,這個插件的工作原理比較簡單,就是定時執行一個 sql,然后將 sql 執行的結果寫入到流中,增量獲取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄當前查詢的位置,由於遞增的特性,只需要查詢比當前大的記錄即可獲取這段時間內的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只適用於那種只有插入沒有更新的表,update_time 更加通用一些,建議在 mysql 表設計的時候都增加一個 update_time 字段。
綜上所述配置:
cd /home/elasticsearch/logstash-6.4.3/config
input {
jdbc {
jdbc_driver_library => "/home/mysql5.7/mysqlDriver/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 8.0以上版本:一定要把serverTimezone=UTC天加上
jdbc_connection_string => "jdbc:mysql://192.168.124.8:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.91.66:9200"]
# 索引名稱 可自定義
index => "user"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{id}"
document_type => "user"
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
將配置文件丟到config下: 名字為mysql.conf,隨便起的
啟動: ./bin/logstash ./config/mysql.conf
注意:因為我用的是mysql的最新的驅動8.多的版本,所以配置數據庫的url時候一定要把serverTimezone=UTC天加上!
同時mysql數據庫:
grant all privileges on *.* to 'root'@'%' identified by 'root' with grant option;
FLUSH PRIVILEGES;
很慢的,啟動過程盯着日志:

kinbana:

如果同步多個多個表:
配合多個上述的config,然后將其地址配置文件中:

追加到最后一行,可以配置多個。

啟動方式為:
./bin/logstash


其他同步方案比較:
Logstash: 定時器方式, 實現簡單
MQ: 實時性,復雜性更高,數據一致性好(自帶重試,補償)
補充:
ES集群搭建: https://www.cnblogs.com/toov5/p/11361413.html
kibana連接ES集群:
docker run -it -d -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --name kibana --network=container:ES01 kibana
訪問:
http://192.168.91.66:5601
