logstash 執行過程
input -->filter -->output
filter 可以對數據進行處理
輸出插件
codec plugin
使用腳本將數據導入到ES
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example" jdbc_user => root jdbc_password => ymruan123 #啟用追蹤,如果為true,則需要指定tracking_column use_column_value => true #指定追蹤的字段, tracking_column => "last_updated" #追蹤字段的類型,目前只有數字(numeric)和時間類型(timestamp),默認是數字類型 tracking_column_type => "numeric" #記錄最后一次運行的結果 record_last_run => true #上面運行結果的保存位置 last_run_metadata_path => "jdbc-position.txt" statement => "SELECT * FROM user where last_updated >:sql_last_value;" schedule => " * * * * * *" } } output { elasticsearch { document_id => "%{id}" document_type => "_doc" index => "users" hosts => ["http://localhost:9200"] } stdout{ codec => rubydebug } }
使用 logstash 執行
logstash -f mysqltoes.conf
使用別名查詢索引
POST /_aliases { "actions": [ { "add": { "index": "users", "alias": "view_users", "filter" : { "term" : { "is_deleted" : false } } } } ] }
創建一個索引別名,過濾掉 只顯示 is_deleted 為未刪除的數據。
通過別名查詢數據
POST view_users/_search { "query": { "term": { "name.keyword": { "value": "Jack" } } } }