增量更新
input { jdbc { jdbc_driver_library => "D:\tools\mysql\mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/canyin?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "228151" statement => "SELECT * FROM goods" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" type => "foods" record_last_run => true last_run_metadata_path => "" clean_run => false } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "127.0.0.1:9200" index => "goods" document_type=>"foods" document_id=>"%{id}" } }
增量更新的參數介紹:
input { stdin { } jdbc { # 數據庫地址 端口 數據庫名 jdbc_connection_string => "jdbc:mysql://localhost:3306/shen" # 數據庫用戶名 jdbc_user => "root" # 數據庫密碼 jdbc_password => "rootroot" # mysql java驅動地址 jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar" # 驅動類的名稱 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #是否記錄上次運行的結果 record_last_run => true #記錄上次運行結果的文件位置 last_run_metadata_path => "" #是否使用數據庫某一列的值, use_column_value => true tracking_column => "id" #numeric或者timestamp tracking_column_type => "numeric" #如果為true則會清除 last_run_metadata_path 的記錄,即重新開始同步數據 clean_run => false #sql_last_value根據tracking類型,默認為0或者1970-1-1 statement => "SELECT * FROM TABLE WHERE id > :last_sql_value" # sql 語句文件,對於復雜的查詢,可以放在文件中。 # statement_filepath => "filename.sql" # 設置監聽間隔,語法與Linux系統Cron相同 schedule => "* * * * *" } } output { stdout { codec => json_lines } elasticsearch { hosts => "localhost:9200" index => "contacts" document_type => "contact" document_id => "%{id}" } }
chedule現在設置成每分鍾都執行一次,是為了方便觀察行為。statefile這一句是一定要加的。$metrics.lastexecutionstart就是這個腳本的關鍵所在了,這個指的是上一次腳本執行的時間,可以通過比較這個時間和數據庫里的字段來判斷是否要更新。
參考文獻:http://www.cnblogs.com/cocowool/p/mysql_data_to_elasticsearch_via_logstash.html
http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
配置Logstash https://www.elastic.co/guide/en/logstash/current/configuration.html#