logstash采集mysql數據同步到es中時間字段相差8小時的解決辦法


最近在做的項目中,需要將MySQL中的數據通過logstash同步至elasticsearch,但是同步后發現es中的文檔時間字段不對了,統統比實際時間提前8小時。

查了資料發現,這是由於logstash在獲取時區的時候,默認獲取的是UTC默認時間,同時elasticsearch在創建索引的時候,統一使用UTC時間,因為中國使用的為東8時區,源數據和實際創建的索引數據會相差8個小時。

所以如果獲取的是UTC時間,會導致我們取es文檔時間字段不符合實際,其實可以通過修改 logstash-db-sync.conf 同步配置文件來解決時間差的問題。

以下是我自己項目中用的配置文件,項目上logstash用的是6.8.4版本,該文件位於/usr/local/logstash-6.8.4/sync目錄下。

 

input {
    jdbc {
        # 設置 MySql/MariaDB 數據庫url以及數據庫名稱
        jdbc_connection_string => "jdbc:mysql://10.1.1.129:3306/ctscm_uat?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
        # 用戶名和密碼
        jdbc_user => "ctscm_uat"
        jdbc_password => "han2018chen"
        # 數據庫驅動所在位置,可以是絕對路徑或者相對路徑
        jdbc_driver_library => "/usr/local/logstash-6.8.4/sync/mysql-connector-java-8.0.19.jar"
        # 驅動類名
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # 開啟分頁
        jdbc_paging_enabled => "true"
        # 分頁每頁數量,可以自定義
        jdbc_page_size => "1000"
        # 執行的sql文件路徑
        # statement_filepath => "/usr/local/logstash-6.8.4/sync/sync.sql"
        statement => "SELECT * FROM contract WHERE update_time > :sql_last_value"
        # 設置定時任務間隔  含義:分、時、天、月、年,全部為*默認含義為每分鍾跑一次任務
        schedule => "* * * * *"
        # 索引類型
        type => "contract"
        # 是否開啟記錄上次追蹤的結果,也就是上次更新的時間,這個會記錄到 last_run_metadata_path 的文件
        use_column_value => true
        # 記錄上一次追蹤的結果值
        last_run_metadata_path => "/usr/local/logstash-6.8.4/sync/track_time"
        
        # 如果 use_column_value 為true, 配置本參數,追蹤的 column 名,可以是自增id或者時間
        tracking_column => "update_time"
        # tracking_column 對應字段的類型
        tracking_column_type => "timestamp"
        # 是否清除 last_run_metadata_path 的記錄,true則每次都從頭開始查詢所有的數據庫記錄
        clean_run => false
        # 數據庫字段名稱大寫轉小寫
        lowercase_column_names => false
        
        jdbc_default_timezone => "Asia/Shanghai"
        
        #使用本地時區為local,否則sql_last_value如果是timestamp,時間會提前8小時
#值可以是任何的:utc,local,默認值為 "utc" plugin_timezone
=> "local" } } filter { # 因為時區問題需要修正時間 ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } # 因為時區問題需要修正時間 ruby { code => "event.set('create_time', event.get('create_time').time.localtime + 8*60*60)" } # 因為時區問題需要修正時間 ruby { code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)" } # 轉換成日期格式 ruby { code => "event.set('start_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } # 轉換成日期格式 ruby { code => "event.set('end_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } # 轉換成日期格式 ruby { code => "event.set('sign_date', event.timestamp.time.localtime.strftime('%Y-%m-%d'))" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["10.1.2.32:9200"] # 同步的索引名 index => "ctscm" # 設置_docID和數據相同 document_id => "%{contract_id}" } # 日志輸出 stdout { codec => json_lines } }

 

說明:

主要是以下幾個參數要正確配置

1.jdbc_connection_string配置上使用CTT(Asia/shanghai)時間

jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT"

或者

jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/shanghai"

 

2.新增時區參數

  • jdbc_default_timezone

值類型為字符串,此設置沒有默認值。

時區轉換。Logstash(和Elasticsearch)期望時間戳以UTC術語表示。如果您的數據庫記錄了相對於另一個時區的時間戳,則將記錄該數據庫的時區,然后將此設置設置為數據庫使用的時區。但是,由於SQL不允許在時間戳字段中提供時區數據,因此我們無法逐條記錄地進行計算。此插件將以ISO8601格式的相對UTC時間自動將您的SQL時間戳字段轉換為Logstash時間戳。

使用此設置將手動分配指定的時區偏移,而不是使用本地計算機的時區設置。

jdbc_default_timezone => "Asia/Shanghai"
  • plugin_timezone
    值可以是任何的:utc,local,默認值為 "utc"。

如果您希望此插件將時間戳偏移到UTC以外的時區,則可以將此設置設置為local,插件將使用OS時區進行偏移調整。

注意:當指定plugin_timezone和/或時jdbc_default_timezone,偏移量調整在兩個地方進行,如果sql_last_value是時間戳,並且在語句中用作參數,則偏移量調整將從插件時區到數據時區,並且在處理記錄時,時間戳從數據庫時區偏移到插件時區。如果您的數據庫時區為UTC,則無需設置這些設置中的任何一個。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM