此系列文章一共分為三部分,分為 filebeat 部分,logstash 部分,es 部分。通過此系列的文章,可以快速了解整個日志收集的大概,本篇主要講解
logstash
這一塊
1. logstash 介紹
版本:logstash-7.12.0
logstash
就是用來處理數據的,通過建一個管道,將數據按照不同的階段,進行處理,並最終輸出的一個過程,以輸入到elasticsearch
為例,如下圖:
2. logstash 工作原理
Logstash 事件處理管道有三個階段:輸入 → 過濾 → 輸出。輸入生成事件,過濾器修改事件,然后輸出到其他地方。輸入和輸出支持編解碼器,使您能夠在數據進入或退出管道時對其進行編碼或解碼,而不必使用單獨的過濾器。
參考官當文檔:https://www.elastic.co/guide/en/logstash/current/pipeline.html#pipeline
2.1 輸入端
input
: 管道的輸入端,可以將數據通過配置 input 輸入到 logstash 的管道中,常用的輸入插件有:
- kafka
- redis
- file
- syslog
- beats
2.2 過濾器
過濾器是 Logstash 管道中的中間處理設備。您可以將篩選器與條件組合在一起,以便在事件滿足特定條件時對其執行操作。一些有用的過濾器包括:
- grok: 解析和構造任意文本。Grok 是目前 Logstash 中解析非結構化日志數據為結構化和可查詢數據的最佳方式。Logstash 內置了 120 個模式,你很可能會找到一個滿足你需要的模式!
- mutate: 對事件字段執行通用轉換。您可以重命名、刪除、替換和修改事件中的字段。
- drop: 完全刪除事件,例如 debug 事件。
- clone: 創建事件的副本,可以添加或刪除字段。
- geoip: 添加關於 IP 地址的地理位置的信息。
- json: 對 json 格式的數據進行處理。
- json_encode: 轉換成 json 格式的數據。
2.3 輸出端
輸出是 Logstash 管道的最后階段。事件可以通過多個輸出,但是一旦所有輸出處理完成,事件就完成了它的執行。一些常用的輸出包括:
- elasticsearch: 發送事件數據到 elasticsearch
- file: 將事件數據寫入磁盤文件。
3. logstash 容器化部署
容器化部署時直接將官方鏡像拿過來,通過 k8s 的Deployment
資源類型進行部署即可。
官方鏡像地址:
3.1 configmap 文件參考
下面的這個configmap
中input
通過配置項topics_pattern
指定一個正則規則來靈活的去匹配一組 topic(當然也可以是用topics
來指定具體的一組 topic), 然后這邊沒有使用filter
做處理,直接輸出到elasticsearch
中。
全局配置文件
apiVersion: v1
data:
logstash.yml: |-
http.host: "0.0.0.0"
pipeline.workers: 2
pipeline.batch.size: 250
pipeline.batch.delay: 50
xpack.management.enabled: false
kind: ConfigMap
metadata:
name: logstash-config-global
namespace: ops-logging
業務相關的配置文件
kind: ConfigMap
apiVersion: v1
metadata:
name: logstash-config-a
namespace: ops-logging
data:
k8s.conf: |-
input {
kafka {
bootstrap_servers => "10.127.91.90:9092,10.127.91.91:9092,10.127.91.92:9092"
group_id => "k8s-hw-group"
client_id => "k8s-hw-client"
consumer_threads => 1
auto_offset_reset => latest
topics_pattern => "k8s-hw.*"
codec => "json"
}
}
filter {
}
output {
if [k8s][nameSpace] == "test" {
elasticsearch {
hosts => ["10.127.91.75:9200", "10.127.91.76:9200", "10.127.91.77:9200", "10.127.91.78:9200", "10.127.91.79:9200", "10.127.91.80:9200", "10.127.91.81:9200"]
index => "k8s-%{[k8s][k8sName]}-%{[k8s][nameSpace]}-%{+YYYYMMddHH}"
sniffing => "true"
timeout => 10
}
} else {
elasticsearch {
hosts => ["10.127.91.75:9200", "10.127.91.76:9200", "10.127.91.77:9200", "10.127.91.78:9200", "10.127.91.79:9200", "10.127.91.80:9200", "10.127.91.81:9200"]
index => "k8s-%{[k8s][k8sName]}-%{[k8s][nameSpace]}-%{+YYYYMMdd}"
sniffing => "true"
timeout => 10
}
}
}
3.1.1 關於配置項需要做下簡單說明
3.1.1.1 INPUT
bootstrap_servers
指定 kafka 地址topics
表示一組確定的 topictopics_pattern
表示通過自定義正則來模糊匹配一組 topicauto_offset_reset
這個字段,表示 Kafka 中沒有初始偏移量或偏移量超出范圍時的策略,其中earliest
: 從頭開始消費latest
: 從最新的 offset 開始消費none
: 如果沒有找到消費者組的先前偏移量,則向消費者拋出異常anything else
: 直接向消費者拋出異常
consumer_threads
消費者端的線程數,理想情況下,您應該擁有與分區數量相同的線程,以達到完美的平衡——線程數量超過分區意味着有些線程將處於空閑狀態,比如說我有 4 個 partition,假如我只啟動一個副本,那么這里最好設置成 4,如果我啟動了 4 個副本,那么這里設置成 1
3.1.1.2 OUTPUT
output 設置了一個判斷,用來對來自 k8s 命名空間的 topic 進行區分,由於我的test
命名空間中的日志量比較大,所以我在建索引時,按小時進行索引,所以這邊單獨設置了下,而其他命名空間走默認的配置項即可
具體可參考官方文檔: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
3.2 deployment 文件參考
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: logstash-k8s
name: logstash-k8s
namespace: ops-logging
spec:
progressDeadlineSeconds: 600
replicas: 0
revisionHistoryLimit: 10
selector:
matchLabels:
app: logstash-k8s
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
labels:
app: logstash-k8s
spec:
containers:
- args:
- /usr/share/logstash/bin/logstash -f /usr/share/logstash/conf/k8s.conf
command:
- /bin/sh
- -c
image: docker.elastic.co/logstash/logstash:7.12.0
imagePullPolicy: IfNotPresent
name: logstash-k8s
resources:
limits:
cpu: "4"
memory: 4G
requests:
cpu: "4"
memory: 4G
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /usr/share/logstash/conf
name: config-volume
- mountPath: /usr/share/logstash/config/logstash.yml
name: logstash-config
readOnly: true
subPath: logstash.yml
- args:
- -c
- /opt/bitnami/logstash-exporter/bin/logstash_exporter --logstash.endpoint='http://localhost:9600'
command:
- /bin/sh
image: bitnami/logstash-exporter:latest
imagePullPolicy: IfNotPresent
name: logstash-exporter-k8s
ports:
- containerPort: 9198
name: lg-exporter
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
runAsUser: 0
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
items:
- key: k8s.conf
path: k8s.conf
name: logstash-config-sg-saas-pro-hbali
name: config-volume
- configMap:
defaultMode: 420
name: logstash-config-global
name: logstash-config
logstash-exporter 的 svc 參考
apiVersion: v1
kind: Service
metadata:
name: logstash-exporter-a
namespace: ops-logging
spec:
ports:
- name: http
port: 9198
protocol: TCP
targetPort: 9198
nodePort: 30003
selector:
app: logstash
sessionAffinity: None
type: NodePort
上面的話應該算是logstash
最簡單的配置了,假如我們想調試的話,可以把下面這段改下
containers:
- args:
- /usr/share/logstash/bin/logstash -f /usr/share/logstash/conf/k8s.conf
改成
containers:
- args:
- sleep 1000000
這樣我們在調試時,可直接進入到容器中調試。
4. logstash 的進階使用
4.1 需求介紹
2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48][ xnio-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超級管理員","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}
上面是很常見的一條java
程序的日志,我們首先想格式化此日志,然后取出里面的請求 body,也就是里面的一條json
{"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超級管理員","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}
取出來之后,我們希望在 elasticsearch 里能根據指定的字段進行快速查詢和聚合,因此需要對這段 json 進行重新解析,把里面的 k,v 都放到頂層,另外這段json
里面還有一部分嵌套的數組,我們希望將數組中的 map 解析出來,並放到最外層中,最后將里面的一些字符串轉換成整型的數據結構。
為了方便調試,這里重新啟動了一個 pod,並指定一個了最簡單的配置,將日志輸出到控制台上,方便調試
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash-debug
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: logstash-debug
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
labels:
app: logstash-debug
spec:
containers:
- args:
- sleep 1000000000000
command:
- /bin/sh
- -c
image: docker.elastic.co/logstash/logstash:7.12.0
imagePullPolicy: IfNotPresent
name: logstash-debug
resources:
limits:
cpu: "4"
memory: 4G
requests:
cpu: "4"
memory: 4G
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
runAsUser: 0
terminationGracePeriodSeconds: 30
pod 啟動成功之后,我們直接指定配置文件
# debug.conf
input {
file {
path => ["/var/log/test.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
啟動
logstash -f debug.conf
隨后將上面的那條日志寫道/var/log/test.log
中
最終控制台輸出結果
{
"host" => "logstash-debug-649dcb789c-n9866",
"path" => "/var/log/test.log",
"@timestamp" => 2021-08-01T06:46:43.292Z,
"@version" => "1",
"message" => "2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48] [ XNIO-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超級管理員\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}"
}
4.2 一步步的去解析日志
使用 logstash 對原始日志進行日志格式化,這應該算是最常見的一種需求了,下面將通過filter
中的grok
來進行日志格式話,下面以上面的日志為例,我們來通過自定義日志格式,然后最終獲取日志里面的一段 json 日志,也就是這一段{"traceId":"edda5da8xxxxxxxxxxxxxxxxxxx387d48","headers":[{"x-forwarded-proto":"http,http","x-tenant-id":"123","x-ca-key":"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637","x-forwarded-port":"80,80","x-forwarded-for":"10.244.2.0","x-ca-client-ip":"10.244.2.0","x-product-code":"xxxxx","authorization":"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899","x-forwarded-host":"gatxxxxxxxxx.gm","x-forwarded-prefix":"/xxxxxx","trace-id":"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48","x-ca-api-id":"1418470181321347075","x-ca-env-code":"TEST"}],"appName":"超級管理員","responseTime":15,"serverName":"test-server","appkey":"a62d54b6bxxxxxxxxxxxxxxxxxxx37","time":"2021-08-01 12:26:04.062","responseStatus":200,"url":"/test/v4/orgs/123/list-children","token":"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899"}
4.2.1 首先進行日志格式化,取出我們想要的日志
grok 官方參考文檔: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
grok 調試工具:https://grokdebug.herokuapp.com/
在上面的工具調試后,會將調試結果一並輸出,如下圖所示:
下面是放到 logstash 中的配置段
filter {
grok {
match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?<traceId>traceId=.*)\] \[ (?<Nio>.*)\] (?<filter>[a-z0-9A-Z.]+) : (?<originBody>{".*"}$)'}
}
}
這里格式化的就是message
中的日志,通過一堆正則,然后來匹配出我們想要的關鍵日志,匹配結果如下:
{
"message" => "2021-08-01 12:26:04.063 INFO 24 --- [traceId=edda5daxxxxxxxxxcfa3387d48] [ XNIO-1 task-1] c.g.c.gateway.filter.AutoTestFilter : {\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超級管理員\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}",
"id" => "24",
"Nio" => " XNIO-1 task-1",
"@timestamp" => 2021-08-01T07:25:09.041Z,
"filter" => "c.g.c.gateway.filter.AutoTestFilter",
"traceId" => "traceId=edda5daxxxxxxxxxcfa3387d48",
"timeFlag" => "2021-08-01 12:26:04.063",
"path" => "/var/log/test.log",
"originBody" => "{\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超級管理員\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}",
"@version" => "1",
"host" => "logstash-debug-649dcb789c-n9866",
"logLevel" => "INFO"
}
4.2.1 刪除不必要的字段
經過處理之后,我們可以看到新加了一個字段名叫做originBody
,我們真正想要的就是這段,其他的字段都不需要,因此把沒有用的字段刪除, 這里用到了mutate
中的remove_field
來刪除字段,關於該字段的具體使用可以參考其官方文檔:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field
filter {
grok {
match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?<traceId>traceId=.*)\] \[ (?<Nio>.*)\] (?<filter>[a-z0-9A-Z.]+) : (?<originBody>{".*"}$)'}
}
mutate {
remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter"]
}
}
經過此次處理后,會去掉message
字段,結果如下所示:
{
"path" => "/var/log/test.log",
"originBody" => "{\"traceId\":\"edda5da8xxxxxxxxxxxxxxxxxxx387d48\",\"headers\":[{\"x-forwarded-proto\":\"http,http\",\"x-tenant-id\":\"123\",\"x-ca-key\":\"a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637\",\"x-forwarded-port\":\"80,80\",\"x-forwarded-for\":\"10.244.2.0\",\"x-ca-client-ip\":\"10.244.2.0\",\"x-product-code\":\"xxxxx\",\"authorization\":\"bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899\",\"x-forwarded-host\":\"gatxxxxxxxxx.gm\",\"x-forwarded-prefix\":\"/xxxxxx\",\"trace-id\":\"edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48\",\"x-ca-api-id\":\"1418470181321347075\",\"x-ca-env-code\":\"TEST\"}],\"appName\":\"超級管理員\",\"responseTime\":15,\"serverName\":\"test-server\",\"appkey\":\"a62d54b6bxxxxxxxxxxxxxxxxxxx37\",\"time\":\"2021-08-01 12:26:04.062\",\"responseStatus\":200,\"url\":\"/test/v4/orgs/123/list-children\",\"token\":\"bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899\"}",
"@version" => "1",
"@timestamp" => 2021-08-01T07:30:17.548Z,
"host" => "logstash-debug-649dcb789c-n9866",
}
4.2.2 將所需日志進行 json 解析
然后我們想將originBody
這個json
中的字段放到頂層中,這里用到了filter
中的json
選項,用來解析json
數據類型的日志,這里面有兩個關鍵字段需要知道:
- source: 指定要處理的 json 字段,這里對應的就是
originBody
- target: 解析后的 json 數據存放位置,如果不指定將輸出到頂層, 由於我這里就是要將解析好的數據放到頂層,因此不指定
target
filter {
grok {
match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?<traceId>traceId=.*)\] \[ (?<Nio>.*)\] (?<filter>[a-z0-9A-Z.]+) : (?<originBody>{".*"}$)'}
}
json {
source => "originBody"
}
mutate {
remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"]
}
}
處理結果如下
{
"@version" => "1",
"serverName" => "test-server",
"time" => "2021-08-01 12:26:04.062",
"appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37",
"responseStatus" => 200,
"url" => "/test/v4/orgs/123/list-children",
"headers" => [
[0] {
"x-tenant-id" => "123",
"x-ca-env-code" => "TEST",
"x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637",
"authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899",
"x-product-code" => "xxxxx",
"x-ca-client-ip" => "10.244.2.0",
"x-forwarded-host" => "gatxxxxxxxxx.gm",
"x-forwarded-prefix" => "/xxxxxx",
"x-forwarded-for" => "10.244.2.0",
"x-ca-api-id" => "1418470181321347075",
"x-forwarded-proto" => "http,http",
"trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48",
"x-forwarded-port" => "80,80"
}
],
"host" => "logstash-debug-649dcb789c-n9866",
"responseTime" => 15,
"token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899",
"appName" => "超級管理員",
"path" => "/var/log/test.log",
"@timestamp" => 2021-08-01T07:50:26.403Z
}
4.2.3 優化數組的結構
基本上到這里我們想要的數據差不多都呈現出來了,但是可以看到headers
這個是個數組,而里面的元素是一個map
,我們需要將數組中的 map 給解析到外層,這里使用的是split
這個選項,使用也很簡單,具體可參考官方文檔: https://www.elastic.co/guide/en/logstash/current/plugins-filters-split.html
filter {
grok {
match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?<traceId>traceId=.*)\] \[ (?<Nio>.*)\] (?<filter>[a-z0-9A-Z.]+) : (?<originBody>{".*"}$)'}
}
json {
source => "originBody"
}
split {
field => "headers"
}
mutate {
remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"]
}
}
處理完之后,結果如下:
{
"appName" => "超級管理員",
"serverName" => "test-server",
"@version" => "1",
"url" => "/test/v4/orgs/123/list-children",
"time" => "2021-08-01 12:26:04.062",
"token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899",
"@timestamp" => 2021-08-01T07:55:01.353Z,
"appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37",
"path" => "/var/log/test.log",
"responseTime" => 15,
"responseStatus" => 200,
"headers" => {
"x-forwarded-proto" => "http,http",
"x-product-code" => "xxxxx",
"x-ca-client-ip" => "10.244.2.0",
"authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899",
"x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637",
"x-forwarded-for" => "10.244.2.0",
"trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48",
"x-forwarded-host" => "gatxxxxxxxxx.gm",
"x-forwarded-prefix" => "/xxxxxx",
"x-forwarded-port" => "80,80",
"x-tenant-id" => "123",
"x-ca-env-code" => "TEST",
"x-ca-api-id" => "1418470181321347075"
},
"host" => "logstash-debug-649dcb789c-n9866"
}
4.2.4 轉換數據類型
嗯,已經滿足了,接下來是最后一步,將某些字段的字符串轉成整型
filter {
grok {
match => {"message" => '%{TIMESTAMP_ISO8601:timeFlag} %{LOGLEVEL:logLevel} %{NUMBER:id} --- \[(?<traceId>traceId=.*)\] \[ (?<Nio>.*)\] (?<filter>[a-z0-9A-Z.]+) : (?<originBody>{".*"}$)'}
}
json {
source => "originBody"
}
split {
field => "headers"
}
mutate {
remove_field => ["message", "timeFlag", "logLevel", "id", "traceId", "Nio", "filter", "originBody"]
convert => {
"responseStatus" => "integer"
"responseTime" => "integer"
}
}
}
最終結果
{
"appName" => "超級管理員",
"token" => "bearer 0ed29c72-0d68-4e13-a3f3-c77e2d971899",
"responseTime" => 15,
"path" => "/var/log/test.log",
"headers" => {
"x-forwarded-host" => "gatxxxxxxxxx.gm",
"trace-id" => "edda5da8278xxxxxxxxxxxxxxxxxxx49cfa3387d48",
"x-ca-key" => "a62d5xxxxxxxxxxxxxxxxxxxxxxxxb1cff8637",
"x-forwarded-prefix" => "/xxxxxx",
"x-ca-api-id" => "1418470181321347075",
"x-ca-client-ip" => "10.244.2.0",
"x-forwarded-for" => "10.244.2.0",
"x-forwarded-port" => "80,80",
"authorization" => "bearer 0ed29xxxxxxxxxxxxxxxxxxxxxxxxx71899",
"x-ca-env-code" => "TEST",
"x-forwarded-proto" => "http,http",
"x-tenant-id" => "123",
"x-product-code" => "xxxxx"
},
"appkey" => "a62d54b6bxxxxxxxxxxxxxxxxxxx37",
"time" => "2021-08-01 12:26:04.062",
"@version" => "1",
"responseStatus" => 200,
"serverName" => "test-server",
"url" => "/test/v4/orgs/123/list-children",
"@timestamp" => 2021-08-01T07:57:54.071Z,
"host" => "logstash-debug-649dcb789c-n9866"
}
到這里就大功告成了
5. 總結
這篇文章只說了logstash
的其中一種日志處理方式,用的是它自帶的一些插件,基本上可以滿足我們日常的一些需求,但是如果加入一些邏輯處理的話,我們也可以通過自定義ruby
代碼段來進行處理,下一篇文章將介紹結合ruby
的日志處理。
歡迎各位朋友關注我的公眾號,來一起學習進步哦