logstash 執行過程
input -->filter -->output
filter 可以對數據進行處理






輸出插件

codec plugin

使用腳本將數據導入到ES
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example"
jdbc_user => root
jdbc_password => ymruan123
#啟用追蹤,如果為true,則需要指定tracking_column
use_column_value => true
#指定追蹤的字段,
tracking_column => "last_updated"
#追蹤字段的類型,目前只有數字(numeric)和時間類型(timestamp),默認是數字類型
tracking_column_type => "numeric"
#記錄最后一次運行的結果
record_last_run => true
#上面運行結果的保存位置
last_run_metadata_path => "jdbc-position.txt"
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
schedule => " * * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "users"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
使用 logstash 執行
logstash -f mysqltoes.conf
使用別名查詢索引
POST /_aliases
{
"actions": [
{
"add": {
"index": "users",
"alias": "view_users",
"filter" : { "term" : { "is_deleted" : false } }
}
}
]
}
創建一個索引別名,過濾掉 只顯示 is_deleted 為未刪除的數據。
通過別名查詢數據
POST view_users/_search
{
"query": {
"term": {
"name.keyword": {
"value": "Jack"
}
}
}
}
