今日內容:
1) JAVA API 操作 ES 集群
2) ES的架構原理
3) ES的 sql操作
4) Beats基本概念及其使用
5) logstash基本概念及其使用
6) kibana基本概念及其使用
1) JAVA API 操作 ES 集群 : 根據關鍵詞查詢 分頁查詢(淺分頁 和 深分頁) 高亮展示數據
2) 構建索引庫的時候, 除了可以指定 mapping信息以外, 還可以指定 分片和副本
PUT /job_idx_shard
{
"mappings": {
"properties": {
"id": { "type": "long", "store": true },
"area": { "type": "keyword", "store": true },
"exp": { "type": "keyword", "store": true },
"edu": { "type": "keyword", "store": true },
"salary": { "type": "keyword", "store": true },
"job_type": { "type": "keyword", "store": true },
"cmp": { "type": "keyword", "store": true },
"pv": { "type": "keyword", "store": true },
"title": { "type": "text", "store": true },
"jd": { "type": "text"}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}
3) beats基本介紹:
本質上是一個數據發送器, 用於將本地服務器中數據發生到logstash或者elasticSearch
fileBeats 主要的作用 監控本地服務器中日志數據, 將監控的數據采集下來, 然后發生到 logstash或者elasticSearch
fileBeats的組成:
input : 輸入 用於監控指定的文件
output: 輸出 用於將數據輸出指定的目的的
4) logstash基本介紹:
數據采集工具, 主要用於將各個數據源中數據進行采集, 對數據進行處理, 將處理后數據, 發生到目的地
Flume : 數據采集的工具, 跟logstash是同類型軟件 都是數據采集的工具
flume和 logstash的對比:
flume 是一款通用的采集工具 , 關注數據的傳輸, 更加純粹的數據傳輸(采集)工具
logstash 更多時候是和beats 以及 elasticSearch組合使用 , 更多關注數據的預處理
logstash 和 fileBeats的對比:
logstash 和filebeat都具有日志收集功能,Filebeat更輕量,占用資源更少
logstash基於JVM運行, 占用資源比較大
logstash可以基於filter對數據進行過濾處理工作, 而fileBeats更多關注數據的傳遞工作
在生產中, 一般關於 elastic Stack的使用:
首先在需要進行采集的服務器上掛在beat組件, 然后beats組件數據數據采集到后, 發生到logstash,
然后logstash通過filter對進行處理, 處理后, 將數據傳遞到elasticSearch中, 后續基於kibana對
數據進行探索對數據進行圖標展示
可能出現的問題:
1) 安裝好fileBeats(logstash | kibana)后, 開始執行, 然后使用ctrl+C 無法退出問題: finalShell
如何解決:
ctrl +z 但是此操作可以回到命令行, 但是不會將 fileBeats關閉
此時如果直接在此啟動 fileBeats 回報一個文件以及被占用錯誤: data path ....
處理的方式:
ps -ef | grep filebeat 找到進程ID 直接 kill -9 殺死即可
2) 路徑上的問題: 在配置文件中, 路徑指向不對
解決方案: 看到任何路徑 都去驗證一下是否正常
3) kibana無法啟動問題, 即使在 kibana根目錄下啟動, 依然無法啟動
懷疑 配置文件修改錯誤: kibana.yml
server.host: "node1.itcast.cn" 沒有修改, 或者改了以后前面的#沒刪除
第二章 Beats、Logstash與Kibana
學習目標
l 能夠安裝部署FileBeat、Logstash和Kibana
l 能夠使用FileBeat采集數據
l 能夠使用Logstash采集、解析數據
l 能夠使用Kibana進行數據探索和可視化
1. Beats
Beats是一個開放源代碼的數據發送器。我們可以把Beats作為一種代理安裝在我們的服務器上,這樣就可以比較方便地將數據發送到Elasticsearch或者Logstash中。Elastic Stack提供了多種類型的Beats組件。
審計數據 |
AuditBeat |
日志文件 |
FileBeat |
雲數據 |
FunctionBeat |
可用性數據 |
HeartBeat |
系統日志 |
JournalBeat |
指標數據 |
MetricBeat |
網絡流量數據 |
PacketBeat |
Windows事件日志 |
Winlogbeat |
Beats可以直接將數據發送到Elasticsearch或者發送到Logstash,基於Logstash可以進一步地對數據進行處理,然后將處理后的數據存入到Elasticsearch,最后使用Kibana進行數據可視化。
1.1 FileBeat簡介 阿善看到
FileBeat專門用於轉發和收集日志數據的輕量級采集工具。它可以為作為代理安裝在服務器上,FileBeat監視指定路徑的日志文件,收集日志數據,並將收集到的日志轉發到Elasticsearch或者Logstash。
1.2 FileBeat的工作原理
啟動FileBeat時,會啟動一個或者多個輸入(Input),這些Input監控指定的日志數據位置。FileBeat會針對每一個文件啟動一個Harvester(收割機)。Harvester讀取每一個文件的日志,將新的日志發送到libbeat,libbeat將數據收集到一起,並將數據發送給輸出(Output)。
1.3 安裝FileBeat
安裝FileBeat只需要將FileBeat Linux安裝包上傳到Linux系統,並將壓縮包解壓到系統就可以了。FileBeat官方下載地址:https://www.elastic.co/cn/downloads/past-releases/filebeat-7-6-1
上傳FileBeat安裝到Linux,並解壓。
tar -xvzf filebeat-7.6.1-linux-x86_64.tar.gz
1.4 使用FileBeat采集Kafka日志到Elasticsearch
1.4.1 需求分析
在資料中有一個kafka_server.log.tar.gz壓縮包,里面包含了很多的Kafka服務器日志,現在我們為了通過在Elasticsearch中快速查詢這些日志,定位問題。我們需要用FileBeats將日志數據上傳到Elasticsearch中。
問題:
l 首先,我們要指定FileBeat采集哪些Kafka日志,因為FileBeats中必須知道采集存放在哪兒的日志,才能進行采集。
l 其次,采集到這些數據后,還需要指定FileBeats將采集到的日志輸出到Elasticsearch,那么Elasticsearch的地址也必須指定。
1.4.2 配置FileBeats
FileBeats配置文件主要分為兩個部分。
- inputs
- output
從名字就能看出來,一個是用來輸入數據的,一個是用來輸出數據的。
1.4.2.1 input配置
filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log #- c:\programdata\elasticsearch\logs\* |
在FileBeats中,可以讀取一個或多個數據源。
1.4.2.2 output配置
默認FileBeat會將日志數據放入到名稱為:filebeat-%filebeat版本號%-yyyy.MM.dd 的索引中。
PS:
FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置選項。
1.4.3 配置文件
- 創建配置文件
cd /export/server/es/filebeat-7.6.1-linux-x86_64 vim filebeat_kafka_log.yml |
- 復制一下到配置文件中
filebeat.inputs: - type: log enabled: true paths: - /export/server/es/data/kafka/server.log.*
output.elasticsearch: hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"] |
1.4.4 運行FileBeat
- 運行FileBeat
./filebeat -c filebeat_kafka_log.yml -e
- 將日志數據上傳到/var/kafka/log,並解壓
mkdir -p /export/server/es/data/kafka/
tar -xvzf kafka_server.log.tar.gz
注意: 文件權限的報錯
如果在啟動fileBeat的時候, 報了一個配置文件權限的錯誤, 請修改其權限為 -rw-r--r--
1.4.5 查詢數據
- 查看索引信息
GET /_cat/indices?v
{ "health": "green", "status": "open", "index": "filebeat-7.6.1-2020.05.29-000001", "uuid": "dplqB_hTQq2XeSk6S4tccQ", "pri": "1", "rep": "1", "docs.count": "213780", "docs.deleted": "0", "store.size": "71.9mb", "pri.store.size": "35.8mb" } |
- 查詢索引庫中的數據
GET /filebeat-7.6.1-2020.05.29-000001/_search
{ "_index": "filebeat-7.6.1-2020.05.29-000001", "_type": "_doc", "_id": "-72pX3IBjTeClvZff0CB", "_score": 1, "_source": { "@timestamp": "2020-05-29T09:00:40.041Z", "log": { "offset": 55433, "file": { "path": "/var/kafka/log/server.log.2020-05-02-16" } }, "message": "[2020-05-02 16:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.itcast.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)", "input": { "type": "log" }, "ecs": { "version": "1.4.0" }, "host": { "name": "node1.itcast.cn" }, "agent": { "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64", "version": "7.6.1", "type": "filebeat", "ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63", "hostname": "node1.itcast.cn" } } } |
FileBeat自動給我們添加了一些關於日志、采集類型、Host各種字段。
1.4.6 解決一個日志涉及到多行問題
我們在日常日志的處理中,經常會碰到日志中出現異常的情況。類似下面的情況:
[2020-04-30 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed. at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102) at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310) at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208) at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [2020-04-30 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread) [2020-04-30 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2.itcast.cn/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) |
在FileBeat中,Harvest是逐行讀取日志文件的。但上述的日志會出現一條日志,跨多行的情況。有異常信息時,肯定會出現多行。我們先來看一下,如果默認不處理這種情況會出現什么問題。
1.4.6.1 導入錯誤日志
- 在/export/server/es/data/kafka/中創建名為server.log.2020-09-10的日志文件
- 將資料中的err.txt日志文本貼入到該文件中
觀察FileBeat,發現FileBeat已經針對該日志文件啟動了Harvester,並讀取到數據數據。
2020-05-29T19:11:01.236+0800 INFO log/harvester.go:297 Harvester started for file: /var/kafka/log/server.log.2020-09-10
- 在Elasticsearch檢索該文件。
GET /filebeat-7.6.1-2020.05.29-000001/_search { "query": { "match": { "log.file.path": "/var/kafka/log/server.log.2020-09-10" } } } |
我們發現,原本是一條日志中的異常信息,都被作為一條單獨的消息來處理了~
這明顯是不符合我們的預期的,我們想要的是將所有的異常消息合並到一條日志中。那針對這種情況該如何處理呢?
1.4.6.2 問題分析
每條日志都是有統一格式的開頭的,就拿Kafka的日志消息來說,[2020-04-30 14:00:05,725]這是一個統一的格式,如果不是以這樣的形式開頭,說明這一行肯定是屬於某一條日志,而不是獨立的一條日志。所以,我們可以通過日志的開頭來判斷某一行是否為新的一條日志。
1.4.6.3 FileBeat多行配置選項
在FileBeat的配置中,專門有一個解決一條日志跨多行問題的配置。主要為以下三個配置:
multiline.pattern: ^\[ multiline.negate: false multiline.match: after |
multiline.pattern表示能夠匹配一條日志的模式,默認配置的是以[開頭的才認為是一條新的日志。
multiline.negate:配置該模式是否生效,默認為false。
multiline.match:表示是否將未匹配到的行追加到上一日志,還是追加到下一個日志。
1.4.6.4 重新配置FileBeat
- 修改filebeat.yml,並添加以下內容
filebeat.inputs: - type: log enabled: true paths: - /var/kafka/log/server.log.* multiline.pattern: '^\[' multiline.negate: true multiline.match: after
output.elasticsearch: hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"] |
- 修改「注冊表」/data.json,將server.log.2020-09-10對應的offset設置為0
cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeat
vim data.json
- 刪除之前創建的文檔
// 刪除指定文件的文檔 POST /filebeat-7.6.1-2020.05.29-000001/_delete_by_query { "query": { "match": { "log.file.path": "/var/kafka/log/server.log.2020-09-10" } } } |
- 重新啟動FileBeat
./filebeat -e
1.5 FileBeat是如何工作的
FileBeat主要由input和harvesters(收割機)組成。這兩個組件協同工作,並將數據發送到指定的輸出。
1.5.1 input和harvester
1.5.1.1 inputs(輸入)
input是負責管理Harvesters和查找所有要讀取的文件的組件。如果輸入類型是 log,input組件會查找磁盤上與路徑描述的所有文件,並為每個文件啟動一個Harvester,每個輸入都獨立地運行。
1.5.1.2 Harvesters(收割機)
Harvesters負責讀取單個文件的內容,它負責打開/關閉文件,並逐行讀取每個文件的內容,並將讀取到的內容發送給輸出,每個文件都會啟動一個Harvester。但Harvester運行時,文件將處於打開狀態。如果文件在讀取時,被移除或者重命名,FileBeat將繼續讀取該文件。
1.5.2 FileBeats如何保持文件狀態
FileBeat保存每個文件的狀態,並定時將狀態信息保存在磁盤的「注冊表」文件中,該狀態記錄Harvester讀取的最后一次偏移量,並確保發送所有的日志數據。如果輸出(Elasticsearch或者Logstash)無法訪問,FileBeat會記錄成功發送的最后一行,並在輸出(Elasticsearch或者Logstash)可用時,繼續讀取文件發送數據。在運行FileBeat時,每個input的狀態信息也會保存在內存中,重新啟動FileBeat時,會從「注冊表」文件中讀取數據來重新構建狀態。
在/export/server/es/filebeat-7.6.1-linux-x86_64/data目錄中有一個Registry文件夾,里面有一個data.json,該文件中記錄了Harvester讀取日志的offset。
2. Logstash
2.1 簡介
Logstash是一個開源的數據采集引擎。它可以動態地將不同來源的數據統一采集,並按照指定的數據格式進行處理后,將數據加載到其他的目的地。最開始,Logstash主要是針對日志采集,但后來Logstash開發了大量豐富的插件,所以,它可以做更多的海量數據的采集。
它可以處理各種類型的日志數據,例如:Apache的web log、Java的log4j日志數據,或者是系統、網絡、防火牆的日志等等。它也可以很容易的和Elastic Stack的Beats組件整合,也可以很方便的和關系型數據庫、NoSQL數據庫、Kafka、RabbitMQ等整合。
2.1.1 經典架構
2.1.2 對比Flume
- Apache Flume是一個通用型的數據采集平台,它通過配置source、channel、sink來實現數據的采集,支持的平台也非常多。而Logstash結合Elastic Stack的其他組件配合使用,開發、應用都會簡單很多
- Logstash比較關注數據的預處理,而Flume跟偏重數據的傳輸,幾乎沒有太多的數據解析預處理,僅僅是數據的產生,封裝成Event然后傳輸。
2.1.3 對比FileBeat
l logstash是jvm跑的,資源消耗比較大
l 而FileBeat是基於golang編寫的,功能較少但資源消耗也比較小,更輕量級
l logstash 和filebeat都具有日志收集功能,Filebeat更輕量,占用資源更少
l logstash 具有filter功能,能過濾分析日志
l 一般結構都是filebeat采集日志,然后發送到消息隊列,redis,kafka中然后logstash去獲取,利用filter功能過濾分析,然后存儲到elasticsearch中
2.2 安裝Logstash
- 切換到itcast用戶
- 下載Logstash
https://www.elastic.co/cn/downloads/past-releases/logstash-7-6-1
- 解壓Logstash到指定目錄
- 運行測試
cd /export/server/es/logstash-7.6.1/ bin/logstash -e 'input { stdin { } } output { stdout {} }' |
等待一會,讓Logstash啟動完畢。
Sending Logstash logs to /export/server/es/logstash-7.6.1/logs which is now configured via log4j2.properties [2020-05-28T16:31:44,159][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2020-05-28T16:31:44,264][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.6.1"} [2020-05-28T16:31:45,631][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values [2020-05-28T16:31:46,532][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team. [2020-05-28T16:31:46,560][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x3ccbc15b run>"} [2020-05-28T16:31:47,268][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2020-05-28T16:31:47,348][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2020-05-28T16:31:47,550][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} |
然后,隨便在控制台中輸入內容,等待Logstash的輸出。
{ "host" => "node1.itcast.cn", "message" => "hello world", "@version" => "1", "@timestamp" => 2020-05-28T08:32:31.007Z } |
ps:
-e選項表示,直接把配置放在命令中,這樣可以有效快速進行測試
2.3 采集Apache Web服務器日志
2.3.1 需求
Apache的Web Server會產生大量日志,當我們想要對這些日志檢索分析。就需要先把這些日志導入到Elasticsearch中。此處,我們就可以使用Logstash來實現日志的采集。
打開這個文件,如下圖所示。我們發現,是一個純文本格式的日志。如下圖所示:
這個日志其實由一個個的字段拼接而成,參考以下表格。
字段名 |
說明 |
client IP |
瀏覽器端IP |
timestamp |
請求的時間戳 |
method |
請求方式(GET/POST) |
uri |
請求的鏈接地址 |
status |
服務器端響應狀態 |
length |
響應的數據長度 |
reference |
從哪個URL跳轉而來 |
browser |
瀏覽器 |
因為最終我們需要將這些日志數據存儲在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一個大文本存儲所有的消息,而是需要將字段一個個的保存在Elasticsearch中。所以,我們需要在Logstash中,提前將數據解析好,將日志文本行解析成一個個的字段,然后再將字段保存到Elasticsearch中。
2.3.2 准備日志數據
將Apache服務器日志上傳到 /export/server/es/data/apache/ 目錄
mkdir -p /export/server/es/data/apache/
2.3.3 使用FileBeats將日志發送到Logstash
在使用Logstash進行數據解析之前,我們需要使用FileBeat將采集到的數據發送到Logstash。之前,我們使用的FileBeat是通過FileBeat的Harvester組件監控日志文件,然后將日志以一定的格式保存到Elasticsearch中,而現在我們需要配置FileBeats將數據發送到Logstash。FileBeat這一端配置以下即可:
#----------------------------- Logstash output --------------------------------- #output.logstash: # Boolean flag to enable or disable the output module. #enabled: true
# The Logstash hosts #hosts: ["localhost:5044"] |
hosts配置的是Logstash監聽的IP地址/機器名以及端口號。
114.113.220.255
准備FileBeat配置文件
cd /export/server/es/filebeat-7.6.1-linux-x86_64
vim filebeat-logstash.yml
因為Apache的web log日志都是以IP地址開頭的,所以我們需要修改下匹配字段。
filebeat.inputs: - type: log enabled: true paths: - /var/apache/log/access.* multiline.pattern: '^\d+\.\d+\.\d+\.\d+ ' multiline.negate: true multiline.match: after
output.logstash: enabled: true hosts: ["node1.itcast.cn:5044"] |
啟動FileBeat,並指定使用新的配置文件
./filebeat -e -c filebeat-logstash.yml
FileBeat將嘗試建立與Logstash監聽的IP和端口號進行連接。但此時,我們並沒有開啟並配置Logstash,所以FileBeat是無法連接到Logstash的。
2020-06-01T11:28:47.585+0800 ERROR pipeline/output.go:100 Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused |
2.3.4 配置Logstash接收FileBeat數據並打印
Logstash的配置文件和FileBeat類似,它也需要有一個input、和output。基本格式如下:
# #號表示添加注釋 # input表示要接收的數據 input { }
# file表示對接收到的數據進行過濾處理 filter {
}
# output表示將數據輸出到其他位置 output { } |
配置從FileBeat接收數據
cd /export/server/es/logstash-7.6.1/config
vim filebeat-print.conf
input { beats { port => 5044 } }
output { stdout { codec => rubydebug } } |
測試logstash配置是否正確
bin/logstash -f config/filebeat-print.conf --config.test_and_exit
[2020-06-01T11:46:33,940][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash |
啟動logstash
bin/logstash -f config/filebeat-print.conf --config.reload.automatic
reload.automatic:修改配置文件時自動重新加載
測試
創建一個access.log.1文件,使用cat test >> access.log.1往日志文件中追加內容。
test文件中只保存一條日志:
[root@node1 log]# cat test 235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249 |
當我們啟動Logstash之后,就可以發現Logstash會打印出來從FileBeat接收到的數據:
{ "log" => { "file" => { "path" => "/var/apache/log/access.log.1" }, "offset" => 825 }, "input" => { "type" => "log" }, "agent" => { "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05", "version" => "7.6.1", "type" => "filebeat", "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64", "hostname" => "node1.itcast.cn" }, "@timestamp" => 2020-06-01T09:07:55.236Z, "ecs" => { "version" => "1.4.0" }, "host" => { "name" => "node1.itcast.cn" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249", "@version" => "1" } |
2.3.5 Logstash輸出數據到Elasticsearch
通過控制台,我們發現Logstash input接收到的數據沒有經過任何處理就發送給了output組件。而其實我們需要將數據輸出到Elasticsearch。所以,我們修改Logstash的output配置。配置輸出Elasticsearch只需要配置以下就可以了:
output { elasticsearch { hosts => [ "localhost:9200" ] }} |
操作步驟:
- 重新拷貝一份配置文件
cp filebeat-print.conf filebeat-es.conf
- 將output修改為Elasticsearch
input { beats { port => 5044 } }
output { elasticsearch { hosts => [ "node1.itcast.cn:9200","node2.itcast.cn:9200","node3.itcast.cn:9200"] } } |
- 重新啟動Logstash
bin/logstash -f config/filebeat-es.conf --config.reload.automatic
- 追加一條日志到監控的文件中,並查看Elasticsearch中的索引、文檔
cat test >> access.log.1
// 查看索引數據
GET /_cat/indices?v
我們在Elasticsearch中發現一個以logstash開頭的索引。
{ "health": "green", "status": "open", "index": "logstash-2020.06.01-000001", "uuid": "147Uwl1LRb-HMFERUyNEBw", "pri": "1", "rep": "1", "docs.count": "2", "docs.deleted": "0", "store.size": "44.8kb", "pri.store.size": "22.4kb" } |
// 查看索引庫的數據
GET /logstash-2020.06.01-000001/_search?format=txt
{
"from": 0,
"size": 1
}
我們可以獲取到以下數據:
"@timestamp": "2020-06-01T09:38:00.402Z", "tags": [ "beats_input_codec_plain_applied" ], "host": { "name": "node1.itcast.cn" }, "@version": "1", "log": { "file": { "path": "/var/apache/log/access.log.1" }, "offset": 1343 }, "agent": { "version": "7.6.1", "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05", "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64", "hostname": "node1.itcast.cn", "type": "filebeat" }, "input": { "type": "log" }, "message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249", "ecs": { "version": "1.4.0" }
|
從輸出返回結果,我們可以看到,日志確實已經保存到了Elasticsearch中,而且我們看到消息數據是封裝在名為message中的,其他的數據也封裝在一個個的字段中。我們其實更想要把消息解析成一個個的字段。例如:IP字段、時間、請求方式、請求URL、響應結果,這樣。
2.3.6 Logstash過濾器
在Logstash中可以配置過濾器Filter對采集到的數據進行中間處理,在Logstash中,有大量的插件供我們使用。參考官網:
https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.html
此處,我們重點來講解Grok插件。
2.3.6.1 查看Logstash已經安裝的插件
bin/logstash-plugin list
2.3.6.2 Grok插件
Grok是一種將非結構化日志解析為結構化的插件。這個工具非常適合用來解析系統日志、Web服務器日志、MySQL或者是任意其他的日志格式。
Grok官網:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-grok.html
2.3.6.3 Grok語法
Grok是通過模式匹配的方式來識別日志中的數據,可以把Grok插件簡單理解為升級版本的正則表達式。它擁有更多的模式,默認,Logstash擁有120個模式。如果這些模式不滿足我們解析日志的需求,我們可以直接使用正則表達式來進行匹配。
官網:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
grok模式的語法是:%{SYNTAX:SEMANTIC}
SYNTAX指的是Grok模式名稱,SEMANTIC是給模式匹配到的文本字段名。例如:
%{NUMBER:duration} %{IP:client} duration表示:匹配一個數字,client表示匹配一個IP地址。 |
默認在Grok中,所有匹配到的的數據類型都是字符串,如果要轉換成int類型(目前只支持int和float),可以這樣:%{NUMBER:duration:int} %{IP:client}
以下是常用的Grok模式:
NUMBER |
匹配數字(包含:小數) |
INT |
匹配整形數字 |
POSINT |
匹配正整數 |
WORD |
匹配單詞 |
DATA |
匹配所有字符 |
IP |
匹配IP地址 |
PATH |
匹配路徑 |
2.3.6.4 用法
filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } |
2.3.7 匹配日志中的IP、日期並打印
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249 |
我們使用IP就可以把前面的IP字段匹配出來,使用HTTPDATE可以將后面的日期匹配出來。
配置Grok過濾插件
- 配置Logstash
input { beats { port => 5044 } }
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\]" } } }
output { stdout { codec => rubydebug } } |
- 啟動Logstash
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{ "log" => { "offset" => 1861, "file" => { "path" => "/var/apache/log/access.log.1" } }, "input" => { "type" => "log" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "date" => "15/Apr/2015:00:27:19 +0849", "ecs" => { "version" => "1.4.0" }, "@timestamp" => 2020-06-01T11:02:05.809Z, "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249", "host" => { "name" => "node1.itcast.cn" }, "ip" => "235.9.200.242", "agent" => { "hostname" => "node1.itcast.cn", "version" => "7.6.1", "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05", "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64", "type" => "filebeat" }, "@version" => "1" } |
我們看到,經過Grok過濾器插件處理之后,我們已經獲取到了ip和date兩個字段。接下來,我們就可以繼續解析其他的字段。
2.3.8 解析所有字段
將日志解析成以下字段:
字段名 |
說明 |
client IP |
瀏覽器端IP |
timestamp |
請求的時間戳 |
method |
請求方式(GET/POST) |
uri |
請求的鏈接地址 |
status |
服務器端響應狀態 |
length |
響應的數據長度 |
reference |
從哪個URL跳轉而來 |
browser |
瀏覽器 |
- 修改Logstash配置文件
input { beats { port => 5044 } }
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status} %{INT:length} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } }
output { stdout { codec => rubydebug } } |
- 測試並啟動Logstash
我們可以看到,8個字段都已經成功解析。
{ "reference" => "www.baidu.com", "@version" => "1", "ecs" => { "version" => "1.4.0" }, "@timestamp" => 2020-06-02T03:30:10.048Z, "ip" => "235.9.200.241", "method" => "POST", "uri" => "/itcast.cn/bigdata.html", "agent" => { "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64", "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972", "version" => "7.6.1", "hostname" => "node1.itcast.cn", "type" => "filebeat" }, "length" => "45", "status" => "200", "log" => { "file" => { "path" => "/var/apache/log/access.log" }, "offset" => 1 }, "input" => { "type" => "log" }, "host" => { "name" => "node1.itcast.cn" }, "tags" => [ [0] "beats_input_codec_plain_applied" ], "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900", "date" => "15/Apr/2015:00:27:19 +0849", "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html HTTP/1.1\" 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\"" } |
2.3.9 將數據輸出到Elasticsearch
到目前為止,我們已經通過了Grok Filter可以將日志消息解析成一個一個的字段,那現在我們需要將這些字段保存到Elasticsearch中。我們看到了Logstash的輸出中,有大量的字段,但如果我們只需要保存我們需要的8個,該如何處理呢?而且,如果我們需要將日期的格式進行轉換,我們又該如何處理呢?
2.3.9.1 過濾出來需要的字段
要過濾出來我們需要的字段。我們需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以對字段進行重命名、刪除、替換或者修改結構。
官方文檔:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-mutate.html
例如,mutate插件可以支持以下常用操作
配置:
注意:此處為了方便進行類型的處理,將status、length指定為int類型。
input { beats { port => 5044 } }
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"] } }
output { stdout { codec => rubydebug } } |
2.3.9.2 轉換日期格式
要將日期格式進行轉換,我們可以使用Date插件來實現。該插件專門用來解析字段中的日期,官方說明文檔:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-date.html
用法如下:
將date字段轉換為「年月日 時分秒」格式。默認字段經過date插件處理后,會輸出到@timestamp字段,所以,我們可以通過修改target屬性來重新定義輸出字段。
Logstash配置修改為如下:
input { beats { port => 5044 } }
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"] } date { match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] target => "date" } }
output { stdout { codec => rubydebug } } |
啟動Logstash:
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{ "status" => "200", "reference" => "www.baidu.com", "method" => "POST", "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900", "ip" => "235.9.200.241", "length" => "45", "uri" => "/itcast.cn/bigdata.html", "date" => 2015-04-14T15:38:19.000Z } |
2.3.9.3 輸出到Elasticsearch指定索引
我們可以通過
elasticsearch {
hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"]
index => "xxx"
}
index來指定索引名稱,默認輸出的index名稱為:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用時間格式化,filter的輸出必須包含 @timestamp字段,否則將無法解析日期。
input { beats { port => 5044 } }
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] } date { match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] target => "date" } }
output { stdout { codec => rubydebug } elasticsearch { hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"] index => "apache_web_log_%{+YYYY-MM}" } } |
啟動Logstash
bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit
bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic
注意:
l index名稱中,不能出現大寫字符
3. Kibana
3.1 簡介
通過上面的這張圖就可以看到,Kibana可以用來展示豐富的圖表。
l Kibana是一個開源的數據分析和可視化平台,使用Kibana可以用來搜索Elasticsearch中的數據,構建漂亮的可視化圖形、以及制作一些好看的儀表盤
l Kibana是用來管理Elastic stack組件的可視化平台。例如:使用Kibana可以進行一些安全設置、用戶角色設置、對Elasticsearch進行快照等等
l Kibana提供統一的訪問入口,不管是日志分析、還是查找文檔,Kibana提供了一個使用這些功能的統一訪問入口
l Kibana使用的是Elasticsearch數據源,Elasticsearch是存儲和處理數據的引擎,而Kibana就是基於Elasticsearch之上的可視化平台
主要功能:
l 探索和查詢Elasticsearch中的數據
l 可視化與分析
3.2 安裝Kibana
在Linux下安裝Kibana,可以使用Elastic stack提供 tar.gz壓縮包。官方下載地址:
https://www.elastic.co/cn/downloads/past-releases/kibana-7-6-1
- 解壓Kibana gz壓縮包
tar -xzf kibana-7.6.1-linux-x86_64.tar.gz
- 進入到Kibana目錄
cd kibana-7.6.1-linux-x86_64/
- 配置Kibana: config/kibana.yml
server.host: "node1.itcast.cn" # The Kibana server's name. This is used for display purposes. server.name: "itcast-kibana"
# The URLs of the Elasticsearch instances to use for all your queries. elasticsearch.hosts: ["http://node1.itcast.cn:9200","http://node2.itcast.cn:9200","http://node3.itcast.cn:9200"] |
- 運行Kibana
./bin/kibana
3.2.1 查看Kibana狀態
輸入以下網址,可以查看到Kibana的運行狀態:
http://node1.itcast.cn:5601/status
3.2.2 查看Elasticsearch的狀態
點擊
按鈕,再點擊 「Index Management」,可以查看到Elasticsearch集群中的索引狀態。
點擊索引的名字,可以進一步查看索引更多的信息。
點擊「Manage」按鈕,還可以用來管理索引。
3.3 添加Elasticsearch數據源
3.3.1 Kibana索引模式
在開始使用Kibana之前,我們需要指定想要對哪些Elasticsearch索引進行處理、分析。在Kibana中,可以通過定義索引模式(Index Patterns)來對應匹配Elasticsearch索引。在第一次訪問Kibana的時候,系統會提示我們定義一個索引模式。或者我們可以通過點擊按鈕,再點擊Kibana下方的Index Patterns,來創建索引模式。參考下圖:
- 定義索引模式,用於匹配哪些Elasticsearch中的索引。點擊「Next step」
- 選擇用於進行時間過濾的字段
- 點擊「Create Index Pattern」按鈕,創建索引模式。創建索引模式成功后,可以看到顯示了該索引模式對應的字段。里面描述了哪些可以用於搜索、哪些可以用來進行聚合計算等。
3.4 探索數據(Discovery)
通過Kibana中的Discovery組件,我們可以快速地進行數據的檢索、查詢。
3.4.1 使用探索數據功能
點擊按鈕可以打開Discovery頁面。
我們發現沒有展示任何的數據。但我們之前已經把數據導入到Elasticsearch中了。
Kibana提示,讓我們擴大我們的查詢的時間范圍。
默認Kibana是展示最近15分鍾的數據。我們把時間范圍調得更長一些,就可以看到數據了。
將時間范圍選擇為1年范圍內的,我們就可以查看到Elasticsearch中的數據了。
3.4.2 導入更多的Apache Web日志數據
- 將資料中的 access.log 文件上傳到Linux
- 將access.log移動到/var/apache/log,並重命名為access.log.2
mv access.log /var/apache/log/access.log.2
- 啟動FileBeat
./filebeat -e -c filebeat-logstash.yml
- 啟動Logstash
bin/logstash -f config/filebeat-es.conf --config.reload.automatic
3.4.3 基於時間過濾查詢
3.4.3.1 選擇時間范圍
3.4.3.2 指定查詢某天的數據
查詢2020年5月6日的所有日志數據。
3.4.3.3 從直方圖
上選擇日期更細粒度范圍
如果要選擇查看某一天的日志,上面這種方式會有一些麻煩,我們有更快更容易的方式。
3.4.4 使用Kibana搜索數據
在Kibana的Discovery組件中,可以在查詢欄中輸入搜索條件。默認情況下,可以使用Kibana內置的標准查詢語言,來進行快速查詢。還有一種是遺留的基於Lucene的查詢語法目前暫時可用,這種查詢語法也可以使用基於JSON的Elasticsearch DSL也是可用的。當我們在Discovery搜索數據時,對應的直方圖、文檔列表都會隨即更新。默認情況下,優先展示最新的文檔,按照時間倒序排序的。
3.4.4.1 Kibana查詢語言(KQL)
在7.0中,Kibana上線了新的查詢語言。這種語言簡潔、易用,有利於快速查詢。
查詢語法:
「字段:值」,如果值是字符串,可以用雙引號括起來。
查詢包含zhihu的請求
*zhihu*
查詢頁面不存在的請求
status : 404
查詢請求成功和不存在的請求
status: (404 or 200)
查詢方式為POST請求,並請求成功的日志
status: 200 and method: post
查詢方式為GET成功的請求,並且響應數據大於512的日志
status: 200 and method: get and length > 512
查詢請求成功的且URL為「/itcast.cn」開頭的日志
uri: "\/itcast.cn\/*"
注意:因為/為特殊字符,需要使用反斜杠進行轉義
3.4.4.2 過濾字段
Kibana的Discovery組件提供各種各樣的篩選器,這樣可以篩選出來我們關注的數據上。例如:我們只想查詢404的請求URI。
指定過濾出來404以及請求的URI、從哪兒跳轉來的日志
將查詢保存下來,方便下次直接查看
下次直接點擊Open就可以直接打開之前保存的日志了
3.5 數據可視化(Visualize)
Kibana中的Visualize可以基於Elasticsearch中的索引進行數據可視化,然后將這些可視化圖表添加到儀表盤中。
3.5.1 數據可視化的類型
l Lens
n 通過簡單地拖拽數據字段,快速構建基本的可視化
l 常用的可視化對象
n 線形圖(Line)、面積圖(Area)、條形圖(Bar):可以用這些帶X/Y坐標的圖形來進行不同分類的比較
n 餅圖(Pie):可以用餅圖來展示占比
n 數據表(Data Table):以數據表格的形式展示
n 指標(Metrics):以數字的方式展示
n 目標和進度:顯示帶有進度指標的數字
n 標簽雲/文字雲(Tag Cloud):以文字雲方式展示標簽,文字的大小與其重要性相關
l Timelion
n 從多個時間序列數據集來展示數據
l 地圖
n 展示地理位置數據
l 熱圖
n 在矩陣的單元格展示數據
l 儀表盤工具
n Markdown部件:顯示一些MD格式的說明
n 控件:在儀表盤中添加一些可以用來交互的組件
l Vega
3.5.2 以餅圖展示404與200的占比
效果圖:
操作步驟:
- 創建可視化
- 選擇要進行可視化圖形類型,此處我們選擇Pie(餅圖類型)
- 選擇數據源
- 添加分桶、分片(還記得嗎?我們在Elasticsearch進行分組聚合都是以分桶方式進行的,可以把它理解為分組)
- 配置分桶以及指標計算方式
- 點擊藍色播放按鈕執行。
- 保存圖形(取名為:apache_log@404_200)
3.5.3 以條形圖方式展示2020年5月每日請求數
效果如下:
開發步驟:
我們還可以修改圖形的樣式,例如:以曲線、面積圖的方式展示。
3.5.4 以TSVB可視化不同訪問來源的數據
TSVB是一個基於時間序列的數據可視化工具,它可以使用Elasticsearch聚合的所有功能。使用TSVB,我們可以輕松地完成任意聚合方式來展示復雜的數據。它可以讓我們快速制作效果的圖表:
- 基於時間序列的圖形展示
- 展示指標數據
- TopN
- 類似油量表的展示
- Markdown自定義數據展示
- 以表格方式展示數據
操作步驟:
- 創建TSVB可視化對象
- 配置Time Series數據源分組條件
- 配置Metric
- TopN
3.5.5 制作用戶選擇請求方式、響應字節大小控制組件
3.5.5.1 控制組件
在Kibana中,我們可以使用控件來控制圖表的展示。例如:提供一個下列列表,供查看圖表的用戶只展示比較關注的數據。我們可以添加兩個類型的控制組件:
- 選項列表
l 根據一個或多個指定選項來篩選內容。例如:我們先篩選某個城市的數據,就可以通過選項列表來選擇該城市
- 范圍選擇滑塊
l 篩選出來指定范圍的數據。例如:我們篩選某個價格區間的商品等。
3.5.5.2 Kibana開發
3.6 制作Dashboard
接下來,我們把前面的幾個圖形放到一個看板中。這樣,我們就可以在一個看板中,瀏覽各類數據了。
- 點擊第三個組件圖標,並創建一個新的Dashboard。
- 點擊Edit編輯Dashboard。
- 依次添加我們之前制作好的圖表。