ELK - 優化 index patterns 和 Kibana 中顯示的多余字段


Demo 跑起來之后,就需要根據具體的負載和日志進行優化了,本次主要是優化在 Kibana 界面中 [Table] 展開的 Patterns,過多的 Patterns 有幾個負面作用:
1)、干擾查看信息
2)、增大索引占用空間
3)、降低 es 的寫入性能

ELK各組件版本:(Windows Server, Linux 下其他大同小異)
1)、Filebeat - 7.3.1
2)、Logstash - 7.3.1
3)、Elasticsearch: 7.3.1
4)、Kibana - 7.3.1

 

以下是優化前的內容,實際需要的僅僅是6個字段,但此處卻有35個字段之多,主要來源是 filebeat 和 logstash 處理時應用了 es 默認模板產生的大量冗余字段。
這里的結果是已經在 logstash 中初步配置了 remove_field,將 "@version"、"message" 、"log_create_time"  移除了,不然整個 Table 展開后更加的臃腫。
@version 和 message 是 es 的動態模板自動創建的,log_create_time 是自定義的字段,用於替換默認模板的 @timestamp

           

 

Kibana -> Setting -> Index Patterns

 

日志樣例:

2019-11-21 08:45:39.656 | ^Warn||ThreadID: 35|Audilog not defined in config.
2019-11-21 08:47:39.012 | *Error|SQLServer||ThreadID: 35|Database connection refused.
2019-11-21 09:42:50.156 | Info|Kafka||ThreadID: 35|Reviced Message:
ID:                                  1
Mode:                                Delay
Message Id:                          1
Map:                                 len=45
Binary Version:                      3.4.6

 以 “|” 為分隔符,將日志分為 6 個 field,可以看到有的只有5段,有的不在一行:
field1 - logtime
field2 - level
field3 - comp // 此字段可能為空
field4 - blank // 此處都是 null
field5 - threadId
field6 - logbody

 

1、先看一下優化后的配置文件

Filebeat 的配置文件,如果要收集一台服務器上的不同程序的日志,可以新建多個配置文件並配置不同的 logstash 端口,再啟動多個進程收集。

如果是輸出到 kafka,則可以在一份配置里配置多個 intput 利用 filed 字段在 output 里送往不同的 topic,而不必啟動多個進程。

  • filebeat.yml
# log files input
filebeat.inputs:
- type:
  enabled: true
  paths:
    - F:\payserverlog\*.log
  multiline.pattern: '[0-9]{4}-[0-9]{2}-[0-9]{2]' // 將不在一行的日志,拼接到以日期開頭的行后
  multiline.negate:  true
  multiline.match: after
  fields:
    nginx: payserver
  scan_frequency: 10s
  max_bytes: 1048576
  tail_files: false
  backoff: 1s
  backoff_factor: 2

# output to logstash
output.logstash:
  hosts: ["172.16.0.11:5146"]

processors:
- drop_fields:
    fields: ["input_type", "log.offset", "host.name", "input.type", "agent.hostname", "agent.type", "ecs.version", "agent.ephemeral_id", "agent.id", "agent.version", "fields.ics", "log.file.path", "log.flags" ]

monitoring:
  enabled: true
elasticsearch: ["http://172.16.0.11:9200"]
# logging logging.level: info logging.to_file: true logging.files: path: E:\ELK\filebeat-7.3.1-windows-x86_64\logs name: filebeat-5146.log interval: 1h keepfiles: 7 logging.json: false

 

Logstash 的配置文件

當單機有多個程序日志需要收集並且輸出端是 Logstash 時,有幾種方式:
1)、使用 if [filed] = 'xxx' 來區分 grok 和 output,此種方式 Logstash 需要做大量的 if 判斷,官方稱之為 Conditional Hell (條件地獄),會嚴重降低 grok 效率 - 不推薦
2)、啟動多個 Logstash 實例,需要配置和管理多個 JVM - 不推薦
3)、使用 Logstash 的 Pipeline , 不必管理多個 JVM,也不必做大量 if 判斷,以下的配置使用的就是該種方式

  •  Config\logstash.yml
pipeline.worker: 10
pipeline.batch.size: 3000
pipeline.batch.delay: 10

http.host: "172.16.0.11"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://172.16.0.11:9200"]

log.level: info

 

  • Config\pipeline.yml
- pipeline: main
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200 path.config: E:\\ELK\\logstash
-7.3.1\\pipeline\\5044-server01-payapi.conf - pipeline: server-01-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200
path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5045-server01-payweb.conf - pipeline: server-02-access
pipeline.workers: 8
pipeline.batch.size: 3000
  pipeline.delay: 200
path.config: E:\\ELK\\logstash-7.3.1\\pipeline\\5046-server02-payapi.conf - pipeline: server-02-error
pipeline.workers: 8
pipeline.batch.size: 3000
pipeline.delay: 200
path.config: E:\\ELK\\logstash
-7.3.1\\pipeline\\5047-server02-payweb.conf

 

  • Pipeline\5044-server01-payapi.conf
input {
  beats {
    port => 5044 // 當配置了Pipeline時,不同的 pipeline 配置文件,此端口不可沖突
     client_inactivity_timeout => 600
  }   
}

filter {
  grok {
    match => {
      "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:comp}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}"
     }
  }

  if "_grokparsefailure" in [tags] { // 某些行按 "|" 分割只有5段,按前面的 grok 會解析失敗,並生成一個值為 "_grokparsefailure" 的 tag,此處重新解析失敗的行
    grok {
      match => {
        "message" => "%{DATA:logtime}\|%{DATA:level}\|%{DATA:blank}\|%{DATA:threadId}\|%{GREEDYDATA:logdoby}"
      }
    }
  }

grok {
  match => {
"message" => "%{TIMESTAMP_ISO8601:log_create_time}" // 將日志的時間按照 TIMESTAMP_ISO8601 解析給臨時變量 log_create_time
}
}

date {
match => ["log_create_time", "yyyy-MM-dd HH:mm:ss.SSS"] // 按時間格式匹配一下
target => "@timestamp" // 將 log_create_time 寫入 @timestamp
}
mutate {
remove_field => "@version"
remove_field => "message"
remove_field => "log_create_time"
remove-field => "tags"
gsub => ["level", "\s", ""] // 移除字段中的空格
gsub => ["comp", "\s", ""]  
}
}

output {
  elasticsearch {
    hosts => ["http://172.16.0.11:9200"]
    index => "payapi-server01-%{+yyyy.MM.dd}"
    manage_template => false // 取消 logstash 自動管理模板功能
    template_name => template_payapi // 映射自定義模板的名字,自定義模板的創建在下方
  }
}

其他的幾個配置文件類似,此處注意幾點:

1)、@timestamp 默認是 logstash 處理日志時的時間,當日志的生成時間和 logstash 的處理時間較為接近時問題不大;但假如你要索引幾個月前的文檔或者日志,此時這個時間差幾乎就不能接受;所以此處新建了一個臨時變量 log_create_time,再使用 date 插件,將其寫入 @timestamp,最后@timestamp 就等於日志的生成時間了,在這個例子里,應該明白 @timestamp、logtime、log_create_time 三者是相同的。

2)、沒有使用 logstash 默認的索引模板,使用的是自定義的索引模板,在 Kibana 的 Console 中新建模板:

PUT /_template/template_payapi
{
  "index_patherns" : "[payapi-*]", // 以 payapi- 開頭的索引均會應用此模板
  "order" : 99, // 模板的優先級
  "settings" : {
    "number_of_shards" : 1, // 索引使用的分片數量
    "number_of_replicas" : 0, // 索引的副本數,當你需要導入大量數據,第一次建立索引時,可以設置為0,提高寫入速度,導入完成后可以動態修改
    "refresh_interval" : "15s" // 向臨時空間刷寫的頻率,類似於硬盤的 fsync
  },
  "mapping" : {
    "dynamic" : "false", // 看下方解釋
    "properties" : {
      "@timestamp" : {
        "type" : "text"
      },
      "logtime" : {
        "type" : "text", // 這里注意,不是所有時間都是 date
        "index" : "false" // true:字段可用於搜索, false: 不能用於搜索
      },
      "level" : {
        "type" : "text",
        "index" : "true"
      },

      "comp" : {
        "type" : "text",
        "index" : "true"
      },

      "blank" : {
        "type" : "text",
        "index" : "false"
      },

      "threadId" : {
        "type" : "text",
        "index" : "true"
      },

      "logbody" : {
        "type" : "text",
        "index" : "ture"
      }

    }
  }
}

 

dynamic 值
說明
true 在索引一個文檔時,如果文檔中有 field 不在 mapping 中,會自動映射類型,添加到 mapping,並索引該字段;
false

在索引一個文檔時,如果文檔中有 field 不在 mapping 中,不會添加到 mapping,也不會索引該字段,

但是會保存下來,在 _source 可以看到該字段,但該字段不能被搜索;

strict 在索引一個文檔時,如果文檔中有 field 不在 mapping 中,logstash 會直接拋出異常,拒絕索引;




 

 

 

 

3)、查看模板,在 Kibana 的 Console 中執行

GET /_template/template_payapi

 

2、最后看下優化之后的效果,僅保留需要的字段

   

 

Kibana -> Setting -> Index Patterns

 僅保留不能被刪除的保留字段和自己需要的字段,同時 stroage 占用和寫入速度也更加的 stable。

 

參考鏈接:
https://blog.csdn.net/shumoyin/article/details/84137178
https://www.jianshu.com/p/dc73ec69c9f7


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM