利用ingest node所提供的Pipeline幫我們對數據進行處理。
在Elasticsearch中的配置文件elasticsearch.yml文件中配置:node.ingest: true
ingest node提供了在對文檔建立索引之前對其進行預處理的功能:
- 接收節點攔截索引或批量API請求
- 運用轉換(transformation)
- 將文檔傳遞回索引或批量API
什么是pipeline呢?
- 一個pipleline就是一套處理器:
- 一個processor就像是Logstash里的一個filter擁有對通過管道(pipeline)的文檔的讀寫權限.
那么Elastic到底提供了哪些processor呢?我們可以參閱Elastic的官方文檔,我們可以看到許多的pocessors可以被利用。
地址:https://www.elastic.co/guide/en/elasticsearch/reference/7.5/ingest-processors.html
定義一個Pipleline
使用PUT命令配合Ingest API來操作。它是存在於cluster state里的。
PUT _ingest/pipeline/my-pipeline-id
{
"description": "DESCRIPTION",
"processors": [
{
...
}
],
"on_failure": [
{
...
}
]
}
這里my-pipleline-id是我們自己命令的在該cluster唯一標識是的pipleline ID。在里面,我們可以定義我們喜歡的processors數組。在處理
失敗后,我們也可以定義相應的processors來完成。
例子
來使用Filebeat來讀取一個log文件,並使用processors對這個log的數據進行處理。
log文件中每一條的數據是這樣的格式:
20.168.183.41 - - [11/Sep/2019:00:00:05 +0000] "GET /category/health HTTP/1.1" 200 132 "/item/software/623" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
配置Filebeat
創建一個叫做filebeat_processor.yml文件:
filebeat.inputs:
- type: log
enabled: true
fields:
apache: true
paths:
- /data/nginx-access.log # 根據實際情況而定
output.elasticsearch:
hosts: ["localhost:9200"]
pipeline: "my_pipeline_id" # 下一步創建的pipeline ID
使用了一個叫做my_pipleline_id的pipeline。它的定義如下:
PUT _ingest/pipeline/my_pipeline_id
{
"description": "Drop ECS field and add one new field",
"processors": [
{
"remove": {
"field": "ecs"
},
"set": {
"field": "added_field",
"value": 0
}
}
]
}
定義了兩個processor: remove及set。一個是刪除一個叫做ecs的項,另外一個是添加一個叫做added_field的項,並把它的值設置為0。
在正常的情況下,如果在我們的配置文件中沒有定義那個pipleline的情況下,那么他們的結果是:
{
"_index" : "filebeat-7.3.0-2019.09.11-000001",
"_type" : "_doc",
"_id" : "637VIG0BJD_DqHjgqvC5",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2019-09-11T14:58:55.902Z",
"message" : """144.228.123.71 - - [11/Sep/2019:01:52:35 +0000] "GET /category/games HTTP/1.1" 200 117 "/search/?c=Books+Software" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
"input" : {
"type" : "log"
},
"fields" : {
"apache" : true
},
"ecs" : {
"version" : "1.0.1"
},
"host" : {
"name" : "localhost"
},
"agent" : {
"hostname" : "localhost",
"id" : "c88813ba-fdea-4a98-a0be-468fb53566f3",
"version" : "7.3.0",
"type" : "filebeat",
"ephemeral_id" : "ec3328d6-f7f0-4134-a2b6-8ff0c5141cc5"
},
"log" : {
"offset" : 300352,
"file" : {
"path" : "/data/nginx-access.log"
}
}
}
}
運行Filebeat
在Filebeat的安裝目錄,運行如下的命令:
./filebeat -c filebeat_processor.yml
查看效果
在Kibana中可以通過如下的命令來查看,
GET _cat/indices?v
看到了一個已經生產的以filebeat為開頭的文件名。我們可以通過如下的命令來查看它的數據:
GET filebeat-7.4.2/_search
其中的一個文檔的soure是這樣的:
"_source" : {
"agent" : {
"hostname" : "localhost",
"id" : "45832d40-b664-466b-a523-3bc58890ea50",
"type" : "filebeat",
"ephemeral_id" : "dbbba131-9c33-4e82-a00a-9e8e09d3e799",
"version" : "7.4.2"
},
"log" : {
"file" : {
"path" : "/data/nginx-access.log"
},
"offset" : 11497
},
"message" : """164.51.31.185 - - [11/Sep/2019:00:04:15 +0000] "GET /item/giftcards/232 HTTP/1.1" 200 130 "/category/electronics" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
"input" : {
"type" : "log"
},
"@timestamp" : "2019-11-23T13:11:57.478Z",
"host" : {
"name" : "localhost"
},
"fields" : {
"apache" : true
},
"added_field" : 0
}
顯然ecs這個field已經不見了,而另外一個叫做added_field新的field被成功添加進來了。這個說明我們的pipleline是起作用的。