ES坑之logstash配置文件


input {

    stdin {

    }

    jdbc {

      # mysql 數據庫鏈接

      jdbc_connection_string => "jdbc:mysql:localhost/database?characterEncoding=utf8"

      # 用戶名和密碼

      jdbc_user => "xxx"

      jdbc_password => "xxxx"

      # 驅動

      jdbc_driver_library => "D:/xx/xx/logstash-6.2.4/config/mysql-connector-java-8.0.18.jar"

      # 驅動類名

      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"

      jdbc_page_size => "50000"

      # 執行的sql 文件路徑+名稱

      #statement_filepath => ""

  parameters => { "sql_last_value" => "UpdateTime" }

  statement => "SELECT * FROM (SELECT * FROM table1 ) t WHERE t.updatetime > :sql_last_value"

  # 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新

  schedule => "* * * * *"

      # 索引類型

      #type => "article"

     # 防止自動將大小轉為小寫

      lowercase_column_names => false

      # 記錄上一次運行記錄

      record_last_run => true

      # 使用字段值

      use_column_value => true

     # 追蹤字段名

      tracking_column => "updatetime"

      # 字段類型

      tracking_column_type => "timestamp"

     # 上一次運行元數據保存路徑

      last_run_metadata_path => "./logstash_last_id"

     # 是否刪除記錄的數據

      clean_run => false

    }

}

 

filter {

    json {

        source => "message"

        remove_field => ["message"]

    }

}

output {

  elasticsearch {

    hosts => "http://localhost:9200/"

    index => "indexname"

         document_type => "articles"

    document_id => "%{articleid}"

    template_overwrite => true

  }

 

  # 這里輸出調試,正式運行時可以注釋掉

  stdout {

      codec => json_lines

  }

}

 

————————————————————————————

注意點

(1)jdbc_driver_library
        mysql-connector-java-5.1.46.jar的存放目錄,這個一定要配置正確,支持全路徑和相對路徑。如果配置不對,將會報“can ”錯誤。
(2)sql_last_value
        標志目前logstash同步的位置信息(類似offset)。比如id、updatetime。logstash通過這個標志,可以判斷目前同步到哪一條數據。
(3)statementstatement_filepath
        statement:執行同步的sql語句,可以同步部分數據。
        statement_filepath:存儲執行同步的sql語句。不和statement同時使用。
(4)schedule
        定時器,表示每隔多長時間同步一次數據。格式類似crontab。
(5)tracking_columntracking_column_type
        tracking_column:表示表中哪一列用於判斷logstash同步的位置信息。與sql_last_value比較判斷是否需要同步這條數據。
        tracking_column_type:racking_column指定列的類型。支持兩種類型:numeric(默認)、timestamp。注意:如果列是時間字段(比如updateTime),一定要指定這個類型為timestamp。我就踩了這個大坑。。。一直同步不成功!!!
(6)last_run_metadata_path
        存儲sql_last_value值的文件名稱及位置。
(7)document_id
        生成elasticsearch的文檔值,盡量使用同步的數據中已有的唯一標識。比如同步訂單數據,可以使用訂單號。

————————————————————————————

在使用過程中遇到的一些問題:

1、數據同步,在同步數據時會有一個記錄值,logstash會根據這個值來進行數據更新,我這里使用的是updateTime

根據內容修改時間進行更新數據,但是發現logstash記錄的時間不是當地時間,具體時區是哪還沒有發現(后續會補全這一塊)

記錄的時間總比數據庫中的時間要快,所以導致數據總是更新不上。

在這一塊目前做了幾個嘗試:

  1.通過在sql語句中拼接和轉換時間,試圖將數據庫中的時間轉換為logstash記錄中的數據時間

  2.在logstash中增加filter配置

filter {
    ruby {
        code => "event.timestamp.time.localtime"
    }
}

以上方法目前均沒有實現,原因:

  1.sql語句中添加時間函數后會報錯,logstash執行語句時無法識別函數(沒有找到原因)

  2.按照網上找的方法,在logstash配置文件中添加filter后並沒有起作用(沒有找到原因)

這些問題需要進一步深入研究原因

2、logstash鏈接mysql數據庫失敗

  報錯一:[2020-07-25T00:21:08,797][ERROR][logstash.inputs.jdbc ] Unable to connect to database. Tried 1 times {:error_message=>"Java::JavaSql::SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up."}

    問題分析:

  首先:因為部署環境為內網,防止是因為服務器之間通信問題,對mysql所在服務器做了Telnet測試,端口可以訪問通,本機安裝Navicat鏈接mysql成功,網絡原因排除

  隨后:檢查了多次鏈接字符串,因為mysql密碼中含有“@”,“$”等特殊字符,修改了mysql密碼做嘗試,mysql方面的原因排除

     隨后:更換了jdbc版本,原使用版本為mysql-connector-java-8.0.19.jar,更換為mysql-connector-java-5.x.jar

  最后請教大神,大神查閱資料后給出的建議是出錯原因可能是時區問題,需要在mysql鏈接字符串中添加時區

  將原本字符串

  jdbc:mysql://172.20.21.10:3306/Project?characterEncoding=utf8&autoReconnect=true

  更改為:

  jdbc:mysql://localhost:3306/Project?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC

  然后更改了驅動名 jdbc_driver_class => "com.mysql.jdbc.Driver"

  將其改為:com.mysql.cj.jdbc.Driver

修改后遂成功

在其他服務器沒有出現以上鏈接出錯的問題,具體原因暫未查明,在此記錄

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM