根據輸入的json字段,分別建立索引。循環生成注冊log和登錄log保存到testlog文件中,結果如下:
{"method":"register","user_id":2933,"user_name":"name_91","level":27,"login_time":1470179550}
{"method":"login","user_id":1247,"user_name":"name_979","level":1,"register_time":1470179550}
{"method":"register","user_id":2896,"user_name":"name_1972","level":17,"login_time":1470179550}
{"method":"login","user_id":2411,"user_name":"name_2719","level":1,"register_time":1470179550}
{"method":"register","user_id":1588,"user_name":"name_1484","level":4,"login_time":1470179550}
{"method":"login","user_id":2507,"user_name":"name_1190","level":1,"register_time":1470179550}
{"method":"register","user_id":2382,"user_name":"name_234","level":21,"login_time":1470179550}
{"method":"login","user_id":1208,"user_name":"name_443","level":1,"register_time":1470179550}
{"method":"register","user_id":1331,"user_name":"name_1297","level":3,"login_time":1470179550}
{"method":"login","user_id":2809,"user_name":"name_743","level":1,"register_time":1470179550}
logstash目錄下建立配置文件
vim config/json.conf
input {
file {
path => "/home/bona/logstash-2.3.4/testlog"
start_position => "beginning"
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.68.135:9200"]
index => "data_%{method}"
}
}
重點是index中,%{method} 來匹配log中的method字段.
以上log就會分別建立data_login data_register兩個索引, 要注意的是索引名稱必須全部小寫
以下是實例
原始數據:
{"countnum":2,"checktime":"2017-05-23 16:59:32"}
{"countnum":2,"checktime":"2017-05-23 16:59:32"}
1、無涉及字段類型轉換 logstash filter 配置如下參數即可
if [type] == "onlinecount" {
json{
source => "message"
}
}
2、涉及字段類型轉換
logstash filter
if [type] == "onlinecount" {
mutate{
split=>["message",","]
add_field => {
"coutnum" => "%{[message][0]}"
}
add_field => {
"checktime" => "%{[message][1]}"
}
remove_field => ["message"]
}
json{
source => "coutnum"
source => "checktime"
#convert => { "coutnum" => "integer" }
target => "coutnum"
target => "checktime"
}
}
kafka數據:{
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_33"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_34"}\n
....
}
kafka團隊因考慮性能問題,將原始日志多條合並一條發送(每一條用換行符分割),這樣我讀的kafka就必須拆成一條一條的寫入到ES,不然數據就不准確了,請問這種需求該如何處理呢?
已解決,開始走了彎路,用的下列方法導致還在一條數據
filter {
mutate {
split=>["message","
"]
}
正解方案
filter {
split {
field => "message"
}
還有一個小問題split中terminator默認是\n,但是我如下寫法為什么切割不成功呢,不寫terminator是可以的
filter {
split {
field => "message"
terminator => "\\n"
}
現有json:
{
"name":"zhangsan",
"friends":
{
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
}
1
2
3
4
5
6
7
8
9
將其解析為:
{
"name":"zhangsan",
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
1
2
3
4
5
6
logstash.conf
input
{
stdin
{
codec => json
}
}
filter
{
mutate
{
add_field => { "@friends" => "%{friends}" } #先新建一個新的字段,並將friends賦值給它
}
json
{
source => "@friends" #再進行解析
remove_field => [ "@alert","alert" ] #刪除不必要的字段,也可以不用這語句
}
}
output
{
stdout { }
}
---------------------
作者:姚賢賢
來源:CSDN
原文:https://blog.csdn.net/u011311291/article/details/86743642
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
由於我們的埋點日志是嵌套json類型,要想最終所有字段展開來統計分析就必須把嵌套json展開。
- 日志格式如下:
2019-01-22 19:25:58 172.17.12.177 /statistics/EventAgent appkey=yiche&enc=0<ype=view&yc_log={"uuid":"73B333EB-EC87-4F9F-867B-A9BF38CBEBB2","mac":"02:00:00:00:00:00","uid":-1,"idfa":"2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8","osv":"iOS11.4.1","fac":"apple","mdl":"iPhone SE","req_id":"360C8C43-73AC-4429-9E43-2C08F4C1C425","itime":1548156351820,"os":"2","sn_id":"6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1","dvid":"3676b52dc155e1eec3ca514f38736fd6","aptkn":"4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1","cha":"App Store","idfv":"B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22","nt":4,"lg_vl":{"pfrom":"shouye","ptitle":"shouye"},"av":"10.3.3"} 218.15.255.124 200
- 最開始Logstash的配置文件如下:
input {
file {
path => ["/data/test_logstash.log"] type => ["nginx_log"] start_position => "beginning" } } filter { if [type] =~ "nginx_log" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip} %{URIPATH:uri} %{GREEDYDATA:args} %{IP:client_ip} %{NUMBER:status}" } } urldecode{ field =>args } kv { source =>"args" field_split =>"&" remove_field => [ "args","@timestamp","message","path","@version","path","host" ] } json { source => "yc_log" remove_field => [ "yc_log" ] } } } output { stdout { codec => rubydebug } }
按照以上配置文件運行Logstash得到的結果如下:
{
"server_ip" => "172.17.12.177", "cha" => "App Store", "mdl" => "iPhone SE", "type" => "nginx_log", "mac" => "02:00:00:00:00:00", "ptitle" => "shouye", "appkey" => "yiche", "idfv" => "B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22", "sn_id" => "6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1", "aptkn" => "4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1", "av" => "10.3.3", "os" => "2", "idfa" => "2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8", "uid" => -1, "uuid" => "73B333EB-EC87-4F9F-867B-A9BF38CBEBB2", "req_id" => "360C8C43-73AC-4429-9E43-2C08F4C1C425", "status" => "200", "uri" => "/statistics/EventAgent", "enc" => "0", "ltype" => "view", "lg_vl" => { "ptitle" => "shouye", "pfrom" => "shouye" }, "nt" => 4, "pfrom" => "shouye", "itime" => 1548156351820, "client_ip" => "218.15.255.124", "create_time" => "2019-01-22 19:25:58", "dvid" => "3676b52dc155e1eec3ca514f38736fd6", "fac" => "apple", "lg_value" => "{\"pfrom\":\"shouye\",\"ptitle\":\"shouye\"}", "osv" => "iOS11.4.1" }
可以看到lg_vl字段仍然是json格式,沒有解析出來。如果直接在配置文件中添加
json { source => "lg_vl" }
會報jsonParseException錯。
- 正確做法
input {
file {
path => ["/data/test_logstash.log"] type => ["nginx_log"] start_position => "beginning" } } filter { if [type] =~ "nginx_log" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip} %{URIPATH:uri} %{GREEDYDATA:args} %{IP:client_ip} %{NUMBER:status}" } } urldecode{ field =>args } kv { source =>"args" field_split =>"&" remove_field => [ "args","@timestamp","message","path","@version","path","host" ] } json { source => "yc_log" remove_field => [ "yc_log" ] } mutate { add_field => { "lg_value" => "%{lg_vl}" } } json { source => "lg_value" remove_field => [ "lg_vl","lg_value" ] } } } output { stdout { codec => rubydebug } }
在解析完上一層json之后添加一個字段lg_value,再將lg_vl的內容賦值給lg_value;之后單獨對lg_value進行json解析就可以了。解析完結果如下:
{
"type" => "nginx_log", "nt" => 4, "dvid" => "3676b52dc155e1eec3ca514f38736fd6", "os" => "2", "fac" => "apple", "ltype" => "view", "client_ip" => "218.15.255.124", "itime" => 1548156351820, "mac" => "02:00:00:00:00:00", "idfa" => "2BFD67CF-ED60-4CF6-BA6E-FC0B18FDDDF8", "uri" => "/statistics/EventAgent", "aptkn" => "4fb9b2bffb808515aa0e9a5f5b17d826769e432f63d5cf87f7fb5ce4d67ef9f1", "sn_id" => "6B937D83-BFB2-4C22-85A8-5B3E82D9D0F1", "create_time" => "2019-01-22 19:25:58", "osv" => "iOS11.4.1", "req_id" => "360C8C43-73AC-4429-9E43-2C08F4C1C425", "ptitle" => "shouye", "av" => "10.3.3", "server_ip" => "172.17.12.177", "pfrom" => "shouye", "enc" => "0", "mdl" => "iPhone SE", "cha" => "App Store", "idfv" => "B1EAD56F-E456-4FF2-A3C2-9A8FA0693C22", "uid" => -1, "uuid" => "73B333EB-EC87-4F9F-867B-A9BF38CBEBB2", "appkey" => "yiche", "status" => "200" }
完美,棒棒噠!!!
作者:神秘的寇先森
鏈接:https://www.jianshu.com/p/de06284e1484
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
Logstash替換字符串,解析json數據,修改數據類型,獲取日志時間
在某些情況下,有些日志文本文件類json,但它的是單引號,具體格式如下,我們需要根據下列日志數據,獲取正確的字段和字段類型
{'usdCnyRate': '6.728', 'futureIndex': '463.36', 'timestamp': '1532933162361'} {'usdCnyRate': '6.728', 'futureIndex': '463.378', 'timestamp': '1532933222335'} {'usdCnyRate': '6.728', 'futureIndex': '463.38', 'timestamp': '1532933348347'} {'usdCnyRate': '6.728', 'futureIndex': '463.252', 'timestamp': '1532933366866'} {'usdCnyRate': '6.728', 'futureIndex': '463.31', 'timestamp': '1532933372350'} {'usdCnyRate': '6.728', 'futureIndex': '463.046', 'timestamp': '1532933426899'} {'usdCnyRate': '6.728', 'futureIndex': '462.806', 'timestamp': '1532933432346'} {'usdCnyRate': '6.728', 'futureIndex': '462.956', 'timestamp': '1532933438353'} {'usdCnyRate': '6.728', 'futureIndex': '462.954', 'timestamp': '1532933456796'} {'usdCnyRate': '6.728', 'futureIndex': '462.856', 'timestamp': '1532933492411'} {'usdCnyRate': '6.728', 'futureIndex': '462.776', 'timestamp': '1532933564378'} {'usdCnyRate': '6.728', 'futureIndex': '462.628', 'timestamp': '1532933576849'} {'usdCnyRate': '6.728', 'futureIndex': '462.612', 'timestamp': '1532933588338'} {'usdCnyRate': '6.728', 'futureIndex': '462.718', 'timestamp': '1532933636808'}
此時我們如果當json直接用logstash Json filter plugin來解析會如下報錯
[WARN ] 2018-07-31 10:20:12.708 [Ruby-0-Thread-5@[main]>worker1: :1] json - Error parsing json {:source=>"message", :raw=>"{'usdCnyRate': '6.728', 'futureIndex': '462.134', 'timestamp': '1532933714371'}", :exception=>#<LogStash::Json::ParserError: Unexpected character (''' (code 39)): was expecting double-quote to start field name at [Source: (byte[])"{'usdCnyRate': '6.728', 'futureIndex': '462.134', 'timestamp': '1532933714371'}"; line: 1, column: 3]>}
此處我認為簡單的做法是替換單引號為雙引號,替換過程應用了logstash mutate gsub
一定要看清楚我10-12行的寫法,作用為替換字符串,14-15行為解析json。我們還需要將usdCnyRate和futureIndex轉為float類型(18-21行),將timestamp轉為時間類型,並重新定義一個logdate來存儲(23-25行)此處用到
logstash date filter plugin
input{
file {
path => "/usr/share/logstash/wb.cond/test.log" start_position => "beginning" sincedb_path => "/dev/null" } } filter{ mutate { gsub =>[ "message", "'", '"' ] } json { source => "message" } mutate { convert => { "usdCnyRate" => "float" "futureIndex" => "float" } } date { match => [ "timestamp", "UNIX_MS" ] target => "logdate" } } output{ stdout{ codec=>rubydebug } }
利用上述配置文件,我們能正確解析出日志文件的字段和類型
{
"message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.378\", \"timestamp\": \"1532933222335\"}", "@timestamp" => 2018-07-31T10:48:48.600Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:47:02.335Z, "usdCnyRate" => 6.728, "timestamp" => "1532933222335", "futureIndex" => 463.378 } { "message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.252\", \"timestamp\": \"1532933366866\"}", "@timestamp" => 2018-07-31T10:48:48.602Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:49:26.866Z, "usdCnyRate" => 6.728, "timestamp" => "1532933366866", "futureIndex" => 463.252 } { "message" => "{\"usdCnyRate\": \"6.728\", \"futureIndex\": \"463.31\", \"timestamp\": \"1532933372350\"}", "@timestamp" => 2018-07-31T10:48:48.602Z, "host" => "logstashvm0", "path" => "/usr/share/logstash/wb.cond/test.log", "@version" => "1", "logdate" => 2018-07-30T06:49:32.350Z, "usdCnyRate" => 6.728, "timestamp" => "1532933372350", "futureIndex" => 463.31 }

