最近在做的項目中,需要將MySQL中的數據通過logstash同步至elasticsearch,但是同步后發現es中的文檔時間字段不對了,統統比實際時間提前8小時。
查了資料發現,這是由於logstash在獲取時區的時候,默認獲取的是UTC默認時間,同時elasticsearch在創建索引的時候,統一使用UTC時間,因為中國使用的為東8時區,源數據和實際創建的索引數據會相差8個小時。
所以如果獲取的是UTC時間,會導致我們取es文檔時間字段不符合實際,其實可以通過修改 logstash-db-sync.conf 同步配置文件來解決時間差的問題。
以下是我自己項目中用的配置文件,項目上logstash用的是6.8.4版本,該文件位於/usr/local/logstash-6.8.4/sync目錄下。
input { jdbc { # 設置 MySql/MariaDB 數據庫url以及數據庫名稱 jdbc_connection_string => "jdbc:mysql://10.1.1.129:3306/ctscm_uat?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai" # 用戶名和密碼 jdbc_user => "ctscm_uat" jdbc_password => "han2018chen" # 數據庫驅動所在位置,可以是絕對路徑或者相對路徑 jdbc_driver_library => "/usr/local/logstash-6.8.4/sync/mysql-connector-java-8.0.19.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 開啟分頁 jdbc_paging_enabled => "true" # 分頁每頁數量,可以自定義 jdbc_page_size => "1000" # 執行的sql文件路徑 # statement_filepath => "/usr/local/logstash-6.8.4/sync/sync.sql" statement => "SELECT * FROM contract WHERE update_time > :sql_last_value" # 設置定時任務間隔 含義:分、時、天、月、年,全部為*默認含義為每分鍾跑一次任務 schedule => "* * * * *" # 索引類型 type => "contract" # 是否開啟記錄上次追蹤的結果,也就是上次更新的時間,這個會記錄到 last_run_metadata_path 的文件 use_column_value => true # 記錄上一次追蹤的結果值 last_run_metadata_path => "/usr/local/logstash-6.8.4/sync/track_time" # 如果 use_column_value 為true, 配置本參數,追蹤的 column 名,可以是自增id或者時間 tracking_column => "update_time" # tracking_column 對應字段的類型 tracking_column_type => "timestamp" # 是否清除 last_run_metadata_path 的記錄,true則每次都從頭開始查詢所有的數據庫記錄 clean_run => false # 數據庫字段名稱大寫轉小寫 lowercase_column_names => false jdbc_default_timezone => "Asia/Shanghai" #使用本地時區為local,否則sql_last_value如果是timestamp,時間會提前8小時
#值可以是任何的:utc,local,默認值為 "utc" plugin_timezone => "local" } } filter { # 因為時區問題需要修正時間 ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } # 因為時區問題需要修正時間 ruby { code => "event.set('create_time', event.get('create_time').time.localtime + 8*60*60)" } # 因為時區問題需要修正時間 ruby { code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)" } # 轉換成日期格式 ruby { code => "event.set('start_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } # 轉換成日期格式 ruby { code => "event.set('end_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } # 轉換成日期格式 ruby { code => "event.set('sign_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["10.1.2.32:9200"] # 同步的索引名 index => "ctscm" # 設置_docID和數據相同 document_id => "%{contract_id}" } # 日志輸出 stdout { codec => json_lines } }
說明:
主要是以下幾個參數要正確配置
1.jdbc_connection_string配置上使用CTT(Asia/shanghai)時間
jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT"
或者
jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/shanghai"
2.新增時區參數
- jdbc_default_timezone
值類型為字符串,此設置沒有默認值。
時區轉換。Logstash(和Elasticsearch)期望時間戳以UTC術語表示。如果您的數據庫記錄了相對於另一個時區的時間戳,則將記錄該數據庫的時區,然后將此設置設置為數據庫使用的時區。但是,由於SQL不允許在時間戳字段中提供時區數據,因此我們無法逐條記錄地進行計算。此插件將以ISO8601格式的相對UTC時間自動將您的SQL時間戳字段轉換為Logstash時間戳。
使用此設置將手動分配指定的時區偏移,而不是使用本地計算機的時區設置。
jdbc_default_timezone => "Asia/Shanghai"
- plugin_timezone
值可以是任何的:utc,local,默認值為 "utc"。
如果您希望此插件將時間戳偏移到UTC以外的時區,則可以將此設置設置為local,插件將使用OS時區進行偏移調整。
注意:當指定plugin_timezone和/或時jdbc_default_timezone,偏移量調整在兩個地方進行,如果sql_last_value是時間戳,並且在語句中用作參數,則偏移量調整將從插件時區到數據時區,並且在處理記錄時,時間戳從數據庫時區偏移到插件時區。如果您的數據庫時區為UTC,則無需設置這些設置中的任何一個。