示例
以下配置能夠實現從 SQL Server 數據庫中查詢數據,並增量式的把數據庫記錄導入到 ES 中。
1. 查詢的 SQL 語句在 statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" 參數指定的文件中給出。
2. 字段的轉換由 add_field 參數完成。
input { jdbc { jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar" jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://localhost:1433;databaseName=test_db" jdbc_user => "sa" jdbc_password => "123" # schedule => 分 時 天 月 年 # schedule => * 22 * * * //will execute at 22:00 every day schedule => "* * * * *" clean_run => false use_column_value => true tracking_column => BUG_ID record_last_run => true last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" lowercase_column_names => false statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" type => "my_info" add_field => {"[基本信息][客戶名稱]" => "%{客戶名稱}" "[基本信息][BUG_ID]" => "%{BUG_ID}" "[基本信息][責任部門]" => "%{責任部門}" "[基本信息][發現版本]" => "%{發現版本}" "[基本信息][發現日期]" => "%{發現日期}" "[基本信息][關閉日期]" => "%{關閉日期}"
}
}
其中,數據庫查詢操作 SQL 如下(my_info.sql):
SELECT 客戶名稱, BUG_ID, ISNULL(VIP_Level,'') AS VIP_Level, ISNULL(責任部門,'') AS 責任部門, ISNULL(發現版本,'') AS 發現版本, ISNULL(發現日期,'') AS 發現日期, ISNULL(關閉日期,發現日期) AS 關閉日期, ISNULL( CASE TD記錄人備注 WHEN 'NULL' THEN '' ELSE TD記錄人備注 END,'' ) AS TD記錄人備注, From test_bug_db.dbo.BugInfor WHERE BUG_ID > :sql_last_value
重要參數說明
JDBC(Java Data Base Connectivity,Javajava數據庫連接)參數
如果要了解其它數據庫,可以參考我的 http://www.cnblogs.com/licongyu/p/5535833.html
jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar" //jdbc sql server 驅動,各個數據庫都有對應的驅動,需自己下載 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" //jdbc class 不同數據庫有不同的 class 配置 jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db" //配置數據庫連接 ip 和端口,以及數據庫 jdbc_user => //配置數據庫用戶名 jdbc_password => //配置數據庫密碼
Schedule設置
# schedule => 分 時 天 月 年 # schedule => * 22 * * * //will execute at 22:00 every day schedule => "* * * * *"
重要參數設置
//是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true //是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要 track 的 column 名稱,此時該參數就要為 true. 否則默認 track 的是 timestamp 的值. use_column_value => true //如果 use_column_value 為真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的.比如:ID. tracking_column => MY_ID //指定文件,來記錄上次執行到的 tracking_column 字段的值
//比如上次數據庫有 10000 條記錄,查詢完后該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢可以從 10001 條處開始.
//我們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該文件中的值(10000). last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" //是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當於從頭開始查詢所有的數據庫記錄 clean_run => false //是否將 column 名稱轉小寫 lowercase_column_names => false //存放需要執行的 SQL 語句的文件位置 statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"