input { stdin { } jdbc { # mysql 數據庫鏈接 jdbc_connection_string => "jdbc:mysql:localhost/database?characterEncoding=utf8" # 用戶名和密碼 jdbc_user => "xxx" jdbc_password => "xxxx" # 驅動 jdbc_driver_library => "D:/xx/xx/logstash-6.2.4/config/mysql-connector-java-8.0.18.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 執行的sql 文件路徑+名稱 #statement_filepath => "" parameters => { "sql_last_value" => "UpdateTime" } statement => "SELECT * FROM (SELECT * FROM table1 ) t WHERE t.updatetime > :sql_last_value" # 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新 schedule => "* * * * *" # 索引類型 #type => "article" # 防止自動將大小轉為小寫 lowercase_column_names => false # 記錄上一次運行記錄 record_last_run => true # 使用字段值 use_column_value => true # 追蹤字段名 tracking_column => "updatetime" # 字段類型 tracking_column_type => "timestamp" # 上一次運行元數據保存路徑 last_run_metadata_path => "./logstash_last_id" # 是否刪除記錄的數據 clean_run => false } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => "http://localhost:9200/" index => "indexname" document_type => "articles" document_id => "%{articleid}" template_overwrite => true } # 這里輸出調試,正式運行時可以注釋掉 stdout { codec => json_lines } }
————————————————————————————
注意點
(1)jdbc_driver_library
mysql-connector-java-5.1.46.jar的存放目錄,這個一定要配置正確,支持全路徑和相對路徑。如果配置不對,將會報“can ”錯誤。
(2)sql_last_value
標志目前logstash同步的位置信息(類似offset)。比如id、updatetime。logstash通過這個標志,可以判斷目前同步到哪一條數據。
(3)statement、statement_filepath
statement:執行同步的sql語句,可以同步部分數據。
statement_filepath:存儲執行同步的sql語句。不和statement同時使用。
(4)schedule
定時器,表示每隔多長時間同步一次數據。格式類似crontab。
(5)tracking_column、tracking_column_type
tracking_column:表示表中哪一列用於判斷logstash同步的位置信息。與sql_last_value比較判斷是否需要同步這條數據。
tracking_column_type:racking_column指定列的類型。支持兩種類型:numeric(默認)、timestamp。注意:如果列是時間字段(比如updateTime),一定要指定這個類型為timestamp。我就踩了這個大坑。。。一直同步不成功!!!
(6)last_run_metadata_path
存儲sql_last_value值的文件名稱及位置。
(7)document_id
生成elasticsearch的文檔值,盡量使用同步的數據中已有的唯一標識。比如同步訂單數據,可以使用訂單號。
————————————————————————————
在使用過程中遇到的一些問題:
1、數據同步,在同步數據時會有一個記錄值,logstash會根據這個值來進行數據更新,我這里使用的是updateTime
根據內容修改時間進行更新數據,但是發現logstash記錄的時間不是當地時間,具體時區是哪還沒有發現(后續會補全這一塊)
記錄的時間總比數據庫中的時間要快,所以導致數據總是更新不上。
在這一塊目前做了幾個嘗試:
1.通過在sql語句中拼接和轉換時間,試圖將數據庫中的時間轉換為logstash記錄中的數據時間
2.在logstash中增加filter配置
filter { ruby { code => "event.timestamp.time.localtime" } }
以上方法目前均沒有實現,原因:
1.sql語句中添加時間函數后會報錯,logstash執行語句時無法識別函數(沒有找到原因)
2.按照網上找的方法,在logstash配置文件中添加filter后並沒有起作用(沒有找到原因)
這些問題需要進一步深入研究原因
2、logstash鏈接mysql數據庫失敗
報錯一:[2020-07-25T00:21:08,797][ERROR][logstash.inputs.jdbc ] Unable to connect to database. Tried 1 times {:error_message=>"Java::JavaSql::SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up."}
問題分析:
首先:因為部署環境為內網,防止是因為服務器之間通信問題,對mysql所在服務器做了Telnet測試,端口可以訪問通,本機安裝Navicat鏈接mysql成功,網絡原因排除
隨后:檢查了多次鏈接字符串,因為mysql密碼中含有“@”,“$”等特殊字符,修改了mysql密碼做嘗試,mysql方面的原因排除
隨后:更換了jdbc版本,原使用版本為mysql-connector-java-8.0.19.jar,更換為mysql-connector-java-5.x.jar
最后請教大神,大神查閱資料后給出的建議是出錯原因可能是時區問題,需要在mysql鏈接字符串中添加時區
將原本字符串
jdbc:mysql://172.20.21.10:3306/Project?characterEncoding=utf8&autoReconnect=true
更改為:
jdbc:mysql://localhost:3306/Project?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
然后更改了驅動名 jdbc_driver_class => "com.mysql.jdbc.Driver"
將其改為:com.mysql.cj.jdbc.Driver
修改后遂成功
在其他服務器沒有出現以上鏈接出錯的問題,具體原因暫未查明,在此記錄