前言
Logstash一般用來收集整理日志,但是也可以做數據的同步
我希望我的數據庫的數據全部存到ElasticSearch之后,我的數據庫做的增刪改查都可以增量的更新到ElasticSearch里面
Logstash配置文件
數據庫驅動
首先,下載JDBC,我使用的是SQLserver,所以直接搜索
Microsoft SQL Server JDBC
然后下載即可,我目前是放在了Logstash的bin文件目錄下
配置文件編寫:看看就行,重點在多表同步配置
還記得安裝篇,我寫的運行Logstash嗎?命令是這樣的,在bin目錄下執行
.\logstash -e 'input { stdin { } } output { stdout {} }'
注意到input和output了吧,一個是輸入,一個是輸出,也就是說Logstash的啟動,得需要一個配置文件,這個配置文件有輸入和輸出.這就好辦了,輸入源有很多,我這里使用SQLserver,輸出呢也有很多,我這里介紹兩個
配置文件,讀取SQLserver數據庫導出到txt和ElasticSearch
input {
jdbc {
jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
# clean_run => true #開啟了sql_last_value就從0開始了
schedule => "* * * * *"
statement => "select Id,Name as name,CreateDate as createDate from Solution where SolutionId > :sql_last_value"
lowercase_column_names => false
use_column_value => true
tracking_column => "solutionid" #注意,這里必須全部小寫
}
}
output {
file{
path => "D:\Desktop\output.txt"
}
# elasticsearch {
# hosts => ["192.168.3.8:9200"]
# index => "solution"
# }
}
我必須講解一下這個配置文件,因為有坑,我踩了
首先,我希望讀取數據庫一張表的數據,導出到txt文本,我希望數據庫有新加入的數據的時候,能增量的給我導出到txt里面去
所以我們查數據庫的時候,導出的做一個記錄,比如Id,下次我再導出的時候,只導出Id比上一次大的即可,但是使用Id作為增量有一個問題,就是我數據庫更新了以前的數據,logstash不知道,所以增量的判斷字段必須得是時間,這個在多表更新的那里講
輸入源解釋
- jdbc_driver_library : 沒啥說的,數據庫驅動,也可以不寫死路徑,可以配置在環境變量里,不過我懶得弄了
- jdbc_driver_class : 沒啥說的,驅動類別
- jdbc_connection_string : 你的數據庫地址
- jdbc_user : 數據庫用戶名
- jdbc_password : 數據庫密碼
- clean_run : 值為true的話sql_last_value就從0開始了,sql_last_value下面講
- schedule : 更新頻率,五個星星代表 分 時 天 月 年,例如"* * * * *"就是每分鍾, " * /1 * * " 就是每小時, "/10 * * * *" /10 是每十分鍾 不加斜杠 10 是每小時的第10分,默認是最小單位是分,也就是全部5個就是每分鍾執行一次,其實也可以秒級執行的,這樣寫
schedule => "*/5 * * * * *"
只要在前面再加一個* 單位就是秒,這里就是每5s執行一次
- use_column_value : 是否開啟增量字段
- tracking_column : 指定增量字段,一般情況下都是表的Id或者創建時間這倆字段,但是我推薦使用時間字段,而且必須要注意,Logstash默認把數據庫字段全部轉成小寫了,所以我們得關閉,使用lowercase_column_names => false
- lowercase_column_names => false 關閉默認的小寫
- statement : 數據庫查詢語句,也可以寫到sql文件里面,讀取文件,不過簡單的話直接寫語句挺好的,這里做了一個增量查詢,我每查詢一次,就把我的增量字段賦值給JDBC自帶的sql_last_value變量,所以我查詢的時候大於sql_last_value就實現了增量更新了
現在知道clean_run 的作用了吧,自己測試的時候,測了幾次,sql_last_value變的很大了,又想從頭測起,就把sql_last_value清0即可
在正式服務器上使用的時候,clean_run 關了
輸出解釋
file{
path => "D:\Desktop\output.txt"
}
這個就是導出到txt
elasticsearch {
hosts => ["192.168.100.100:9200"]
index => "solution"
}
這個就是導出到ElasticSearch,Index是solution
啟動
還是來到bin目錄下,啟動命令框,輸入
logstash -f jdbcconfig/jdbc.conf
我在bin目錄下新建了一個文件夾叫jdbcconfig,所以換成你們自己的文件夾名.我的配置文件叫jdbc.conf,同樣也換成你們自己的
多表同步 : 重要,非常重要
這一塊是重點,我踩了很多的坑,找了很多的資料,嘗試了很多次,終於找到了我想要的解決方案
上面講的配置是單表的,但是實際應用的時候都是多表導入到ElasticSearch的,這個時候有兩種方法,我目前已知的2種
- 多寫幾個配置文件,多啟動幾次
- 一個配置文件里寫多個輸入源和多個輸出
input {
jdbc {
jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select ArticleID as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate from Article where ApproveState = 1 and UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "article"
}
jdbc {
jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select SolutionId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate,Tags,ClickCount from Solution where UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"
type => "solution"
}
jdbc {
jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
# clean_run => true
statement => "select FileId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from DownloadFile where ApproveState = 1 and UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "downloadfile"
}
jdbc {
jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select VideoId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from Video where UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "video"
}
}
filter {
mutate {
add_field => {
"[@metadata][articleid]" => "%{articleid}"
}
add_field => {
"[@metadata][solutionid]" => "%{solutionid}"
}
add_field => {
"[@metadata][fileid]" => "%{fileid}"
}
add_field => {
"[@metadata][videoid]" => "%{videoid}"
}
}
}
output {
# stdout {
# codec => json_lines
# }
if [type] == "article"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "article"
action => "index"
document_id => "%{[@metadata][articleid]}"
}
}
if [type] == "solution"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "solution"
action => "index"
document_id => "%{[@metadata][solutionid]}"
}
}
if [type] == "downloadfile"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "downloadfile"
action => "index"
document_id => "%{[@metadata][fileid]}"
}
}
if [type] == "video"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "video"
action => "index"
document_id => "%{[@metadata][videoid]}"
}
}
}
配置講解
如果是Id就這樣寫
use_column_value => true
tracking_column => "videoid"
如果是時間就這樣寫,多了一個時間類型
use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"
想要做增量的更新,包括插入和修改,就最好使用一個updateDate字段
filter是干嘛的?
這一塊我現在還很模糊,不過我現在會使用的是定義變量,我定義一個變量,使用
filter {
mutate {
add_field => {
"[@metadata][videoid]" => "%{videoid}"
}
}
}
這個意思就是,定義了一個[@metadata][videoid]的變量,值是上面的查詢的表里面的videoid
然后我下邊的output里面就可以直接"%{[@metadata][videoid]}"來調用了
所以目前我所知的filter作用,就是變量溝通input和output,當然還有其他的,我還沒看
sql語句
我input里面寫的sql語句,都是把列名都寫出來了,這樣好,可以重命名,一定要注意,數據庫里面的type字段一定要重命名,不然會沖突
而且時間我都轉了一下
CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate
因為網上看到有很多時區的問題,所以我直接轉成字符串就解決時區的問題了吧
坑
數據庫Type字段問題
數據庫字段不能有type,因為Logstash的配置文件使用了type,巨坑,所以數據庫里面的type字段自己重命名
數據庫字段必須駝峰命名
我數據查詢的時候,記得一定要重命名成駝峰的格式,不然ES不能映射給Model,例如
select VideoId as Id,Name as name,Title as title,CreateDate as createDate from ...
注意,單個單詞的全部小寫,CreateDate這種兩個大寫字母的,駝峰命名,前面小寫,后面大寫
如果你實在不想使用駝峰命名,每個字段去as一次,也有其他辦法,就是在你接收的Model上加上 [Text(Name = "CreateDate")]也行
類型錯誤
expected:'String Begin Token', actual:'10', at offset:597
expected:'String Begin Token', actual:'22', at offset:332
這種錯都一樣,都是類型的錯誤,比如ES里面是long,但是你的Model接收的是string,我遇到的情況是我的數據ID,有的是int類型的,有的是string類型的,我都用一個string類型的Id去接收,這就報錯了