Logstash介紹
下載安裝
logstash是基於Java的服務,各操作系統安裝Java環境均可使用。
Java
https://www.java.com/zh_CN/
安裝后配置好java環境變量。
logstash
最新版 https://www.elastic.co/downloads/logstash
2.3.4版 https://www.elastic.co/downloads/past-releases/logstash-2-3-4
配置結構
#輸入
Input{
Jdbc{}
}
#加工過濾
Filter{
Json{}
}
#輸出
Output{
elasticsearch{}
}
支持的插件
Input: elasticsearch,file,http_poller,jdbc,log4j,rss,rabbitmq,redis,syslog,tcp,udp…等
Filter: grok,json,mutate,split …等
Output: email,elasticsearch,file,http,mongodb,rabbitmq,redis,stdout,tcp,udp …等
配置說明地址: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
應用示例
#jdbc_demo
input {
jdbc {
#數據庫鏈接設置
jdbc_driver_library => "k:\k\k\sqljdbc_6.0\chs\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.0.1;user=sa;password=sa;DatabaseName=EStatistic;"
jdbc_user => "sa"
jdbc_password => "sa"
statement_filepath => "../config-demo/sqlscript/jdbc_demo.sql" #sql腳本
schedule => "* * * * *" #執行計划
record_last_run => true #記錄最后運行值
use_column_value => true #
tracking_column => id #要記錄的字段
last_run_metadata_path => "../config-demo/log/jdbc_demo" #記錄的位置
lowercase_column_names => true #將字段名全部改為小寫
clean_run => false
jdbc_fetch_size => 1000 #分頁設置
jdbc_page_size => 1000
jdbc_paging_enabled => true
}
}
filter {
mutate {
#重命名,可以將字段名改掉
rename => {
"id" => "Id"
"bid" => "BId"
"saleconsultantid" => "SaleConsultantId"
"avgreplyduration" => "AvgReplyDuration"
"avgreplyratio" => "AvgReplyRatio"
"avgonlineduration" => "AvgOnlineDuration"
"stattime" => "StatTime"
}
}
}
output {
elasticsearch {
hosts => ["192.168.0.1:9200"] #es 服務地址
index => "jdbc_demo" #索引的名字
document_type => "demoinfo" #類型的名字
workers => 1 #輸出時進程數
document_id=>"%{Id}" #文檔的唯一ID
template => "../config-demo/templates/jdbc_demo.json" #模板的路徑
template_name => "jdbc_demo" #模板的名字
template_overwrite => false ##是否覆蓋已存在的模板
}
# stdout{
# codec => rubydebug
# }
}
索引模板
{
"order": 1,
"template": "jdbc_demo", //模板匹配規則,已經索引名匹配(eg:jdbc_demo-*,可以匹配到,jdbc_demo-1,jdbc_demo-14...)
"settings": {
"index.number_of_shards": 4, //分片數
"number_of_replicas": 1 //每個分配備份數
},
"mappings": {
"_default_": {
"_source": {
"enabled": true
}
},
"demoinfo": { //動態模板 如果映射后,有新的字段添加進來,並且在字段區域沒有映射會按該動態模板匹配映射
"dynamic_templates": [
{
"string_field": {
"match": "*",
"match_mapping_type": "string", //匹配到所有字符串 設置為不分詞
"mapping": {
"index": "not_analyzed",
"type": "string"
}
}
}
], //類型名
"_source": {
"enabled": true
},
"_all": {
"enabled": false
},
"properties": { //字段區域
"Id": {
"type": "long"
},
"BId": {
"type": "integer"
},
"SaleConsultantId": {
"type": "integer"
},
"AvgReplyDuration": {
"type": "integer"
},
"AvgReplyRatio": {
"type": "double"
},
"AvgOnlineDuration": {
"type": "integer"
},
"StatTime": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
}
}
}
},
"aliases": {
"logstashdemo": {} //別名,匹配到該模板的會設置一個別名
}
}
Sqlserver 查詢語句
SELECT [Id] ,[BId] ,[SaleConsultantId] ,[AvgReplyDuration] ,[AvgReplyRatio] ,[AvgOnlineDuration] ,[StatTime] FROM [EStatistic].[dbo].[StatSessionBy7Day] WHERE Id>:sql_last_value --:sql_last_value是記錄的最后的一個值
5.x模板String字段
{"properties": { //字段區域
"NewField": {
"type": "keyword", // keyword 不分詞
"index": false //不建立索引
},
"NewFieldText": {
"type": "text", // text分詞
"index": true // 建立索引
}
}
}
注意事項
-
Jdbc(input)拉取數據使用分頁功能時無法查詢text、ntext 和 image字段。
-
jdbc(input)使用分頁時會將字段全部轉換為大寫。
-
elasticsearch(output)的模板中匹配符,一定要能夠匹配到索引名稱才能生效,並且要避免讓其他索引匹配到,以免影響其它新索引。
-
elasticsearch(output)定義索引名稱必須全小寫(es限制)。
-
以時間字段進行跟蹤時sql查詢語句不能使用Top x限制每次查詢條數,這會導致最后的記錄值並非是最大的值,所拉取的數據可能會出現數據重復拉取(但是在es中不會體現為多條重復數據,只是version字段會>1)或數據丟失。
-
跟蹤主鍵ID時需顯示設置 order by id asc
-
無檢索文本內容需設置“index”: “not_analyzed”

