Beats、Logstash與Kibana


今日內容:
1) JAVA API 操作 ES 集群
2) ES的架構原理
3) ES的 sql操作
4) Beats基本概念及其使用
5) logstash基本概念及其使用
6) kibana基本概念及其使用

1) JAVA API 操作 ES 集群 : 根據關鍵詞查詢 分頁查詢(淺分頁 和 深分頁) 高亮展示數據

 

2) 構建索引庫的時候, 除了可以指定 mapping信息以外, 還可以指定 分片和副本
PUT /job_idx_shard
{
"mappings": {
"properties": {
"id": { "type": "long", "store": true },
"area": { "type": "keyword", "store": true },
"exp": { "type": "keyword", "store": true },
"edu": { "type": "keyword", "store": true },
"salary": { "type": "keyword", "store": true },
"job_type": { "type": "keyword", "store": true },
"cmp": { "type": "keyword", "store": true },
"pv": { "type": "keyword", "store": true },
"title": { "type": "text", "store": true },
"jd": { "type": "text"}

}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}


3) beats基本介紹:
本質上是一個數據發送器, 用於將本地服務器中數據發生到logstash或者elasticSearch

fileBeats 主要的作用 監控本地服務器中日志數據, 將監控的數據采集下來, 然后發生到 logstash或者elasticSearch

fileBeats的組成:
input : 輸入 用於監控指定的文件

output: 輸出 用於將數據輸出指定的目的的

4) logstash基本介紹:
數據采集工具, 主要用於將各個數據源中數據進行采集, 對數據進行處理, 將處理后數據, 發生到目的地

Flume : 數據采集的工具, 跟logstash是同類型軟件 都是數據采集的工具

flume和 logstash的對比:
flume 是一款通用的采集工具 , 關注數據的傳輸, 更加純粹的數據傳輸(采集)工具
logstash 更多時候是和beats 以及 elasticSearch組合使用 , 更多關注數據的預處理
logstash 和 fileBeats的對比:
logstash 和filebeat都具有日志收集功能,Filebeat更輕量,占用資源更少
logstash基於JVM運行, 占用資源比較大
logstash可以基於filter對數據進行過濾處理工作, 而fileBeats更多關注數據的傳遞工作

在生產中, 一般關於 elastic Stack的使用:
首先在需要進行采集的服務器上掛在beat組件, 然后beats組件數據數據采集到后, 發生到logstash,
然后logstash通過filter對進行處理, 處理后, 將數據傳遞到elasticSearch中, 后續基於kibana對
數據進行探索對數據進行圖標展示


可能出現的問題:
1) 安裝好fileBeats(logstash | kibana)后, 開始執行, 然后使用ctrl+C 無法退出問題: finalShell
如何解決:
ctrl +z 但是此操作可以回到命令行, 但是不會將 fileBeats關閉

此時如果直接在此啟動 fileBeats 回報一個文件以及被占用錯誤: data path ....

處理的方式:
ps -ef | grep filebeat 找到進程ID 直接 kill -9 殺死即可
2) 路徑上的問題: 在配置文件中, 路徑指向不對
解決方案: 看到任何路徑 都去驗證一下是否正常
3) kibana無法啟動問題, 即使在 kibana根目錄下啟動, 依然無法啟動
懷疑 配置文件修改錯誤: kibana.yml
server.host: "node1.itcast.cn" 沒有修改, 或者改了以后前面的#沒刪除

 

 

第二章 Beats、Logstash與Kibana

學習目標

l 能夠安裝部署FileBeat、Logstash和Kibana

l 能夠使用FileBeat采集數據

l 能夠使用Logstash采集、解析數據

l 能夠使用Kibana進行數據探索和可視化

1.  Beats

Beats是一個開放源代碼的數據發送器。我們可以把Beats作為一種代理安裝在我們的服務器上,這樣就可以比較方便地將數據發送到Elasticsearch或者Logstash中。Elastic Stack提供了多種類型的Beats組件。

審計數據

AuditBeat

日志文件

FileBeat

雲數據

FunctionBeat

可用性數據

HeartBeat

系統日志

JournalBeat

指標數據

MetricBeat

網絡流量數據

PacketBeat

Windows事件日志

Winlogbeat

 

 

Beats可以直接將數據發送到Elasticsearch或者發送到Logstash,基於Logstash可以進一步地對數據進行處理,然后將處理后的數據存入到Elasticsearch,最后使用Kibana進行數據可視化。

1.1  FileBeat簡介  阿善看到

FileBeat專門用於轉發和收集日志數據的輕量級采集工具。它可以為作為代理安裝在服務器上,FileBeat監視指定路徑的日志文件,收集日志數據,並將收集到的日志轉發到Elasticsearch或者Logstash。

1.2  FileBeat的工作原理

啟動FileBeat時,會啟動一個或者多個輸入(Input),這些Input監控指定的日志數據位置。FileBeat會針對每一個文件啟動一個Harvester(收割機)。Harvester讀取每一個文件的日志,將新的日志發送到libbeat,libbeat將數據收集到一起,並將數據發送給輸出(Output)。

 

 

 

 

1.3  安裝FileBeat

安裝FileBeat只需要將FileBeat Linux安裝包上傳到Linux系統,並將壓縮包解壓到系統就可以了。FileBeat官方下載地址:https://www.elastic.co/cn/downloads/past-releases/filebeat-7-6-1

 

上傳FileBeat安裝到Linux,並解壓。

tar -xvzf filebeat-7.6.1-linux-x86_64.tar.gz

1.4  使用FileBeat采集Kafka日志到Elasticsearch

1.4.1  需求分析

在資料中有一個kafka_server.log.tar.gz壓縮包,里面包含了很多的Kafka服務器日志,現在我們為了通過在Elasticsearch中快速查詢這些日志,定位問題。我們需要用FileBeats將日志數據上傳到Elasticsearch中。

 

問題:

l 首先,我們要指定FileBeat采集哪些Kafka日志,因為FileBeats中必須知道采集存放在哪兒的日志,才能進行采集。

l 其次,采集到這些數據后,還需要指定FileBeats將采集到的日志輸出到Elasticsearch,那么Elasticsearch的地址也必須指定。

1.4.2  配置FileBeats

FileBeats配置文件主要分為兩個部分。

  1. inputs
  2. output

從名字就能看出來,一個是用來輸入數據的,一個是用來輸出數據的。

1.4.2.1  input配置

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /var/log/*.log

    #- c:\programdata\elasticsearch\logs\*

在FileBeats中,可以讀取一個或多個數據源。

 

 

 

 

1.4.2.2  output配置

 

 

 

 

默認FileBeat會將日志數據放入到名稱為:filebeat-%filebeat版本號%-yyyy.MM.dd 的索引中。

 

PS:

FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置選項。

1.4.3  配置文件

  1. 創建配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64

vim filebeat_kafka_log.yml

  1. 復制一下到配置文件中

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /export/server/es/data/kafka/server.log.*

 

output.elasticsearch:

    hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"]

1.4.4  運行FileBeat

  1. 運行FileBeat

./filebeat  -c filebeat_kafka_log.yml -e

  1. 將日志數據上傳到/var/kafka/log,並解壓

mkdir -p /export/server/es/data/kafka/

tar -xvzf kafka_server.log.tar.gz

 

注意: 文件權限的報錯

如果在啟動fileBeat的時候, 報了一個配置文件權限的錯誤, 請修改其權限為 -rw-r--r--

1.4.5  查詢數據

  1. 查看索引信息

GET /_cat/indices?v

    {

        "health": "green",

        "status": "open",

        "index": "filebeat-7.6.1-2020.05.29-000001",

        "uuid": "dplqB_hTQq2XeSk6S4tccQ",

        "pri": "1",

        "rep": "1",

        "docs.count": "213780",

        "docs.deleted": "0",

        "store.size": "71.9mb",

        "pri.store.size": "35.8mb"

    }

 

  1. 查詢索引庫中的數據

GET /filebeat-7.6.1-2020.05.29-000001/_search

            {

                "_index": "filebeat-7.6.1-2020.05.29-000001",

                "_type": "_doc",

                "_id": "-72pX3IBjTeClvZff0CB",

                "_score": 1,

                "_source": {

                    "@timestamp": "2020-05-29T09:00:40.041Z",

                    "log": {

                        "offset": 55433,

                        "file": {

                            "path": "/var/kafka/log/server.log.2020-05-02-16"

                        }

                    },

                    "message": "[2020-05-02 16:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.itcast.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)",

                    "input": {

                        "type": "log"

                    },

                    "ecs": {

                        "version": "1.4.0"

                    },

                    "host": {

                        "name": "node1.itcast.cn"

                    },

                    "agent": {

                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                        "version": "7.6.1",

                        "type": "filebeat",

                        "ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63",

                        "hostname": "node1.itcast.cn"

                    }

                }

            }

FileBeat自動給我們添加了一些關於日志、采集類型、Host各種字段。

1.4.6  解決一個日志涉及到多行問題

我們在日常日志的處理中,經常會碰到日志中出現異常的情況。類似下面的情況:

[2020-04-30 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed.

        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)

        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)

        at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

        at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

        at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

[2020-04-30 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

[2020-04-30 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2.itcast.cn/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

在FileBeat中,Harvest是逐行讀取日志文件的。但上述的日志會出現一條日志,跨多行的情況。有異常信息時,肯定會出現多行。我們先來看一下,如果默認不處理這種情況會出現什么問題。

1.4.6.1  導入錯誤日志
  1. 在/export/server/es/data/kafka/中創建名為server.log.2020-09-10的日志文件
  2. 將資料中的err.txt日志文本貼入到該文件中

 

觀察FileBeat,發現FileBeat已經針對該日志文件啟動了Harvester,並讀取到數據數據。

2020-05-29T19:11:01.236+0800    INFO    log/harvester.go:297    Harvester started for file: /var/kafka/log/server.log.2020-09-10

 

  1. 在Elasticsearch檢索該文件。

GET /filebeat-7.6.1-2020.05.29-000001/_search

{

    "query": {

        "match": {

            "log.file.path": "/var/kafka/log/server.log.2020-09-10"

        }

    }

}

 

我們發現,原本是一條日志中的異常信息,都被作為一條單獨的消息來處理了~

 

這明顯是不符合我們的預期的,我們想要的是將所有的異常消息合並到一條日志中。那針對這種情況該如何處理呢?

1.4.6.2  問題分析

每條日志都是有統一格式的開頭的,就拿Kafka的日志消息來說,[2020-04-30 14:00:05,725]這是一個統一的格式,如果不是以這樣的形式開頭,說明這一行肯定是屬於某一條日志,而不是獨立的一條日志。所以,我們可以通過日志的開頭來判斷某一行是否為新的一條日志。

1.4.6.3  FileBeat多行配置選項

在FileBeat的配置中,專門有一個解決一條日志跨多行問題的配置。主要為以下三個配置:

multiline.pattern: ^\[

multiline.negate: false

multiline.match: after

multiline.pattern表示能夠匹配一條日志的模式,默認配置的是以[開頭的才認為是一條新的日志。

multiline.negate:配置該模式是否生效,默認為false。

multiline.match:表示是否將未匹配到的行追加到上一日志,還是追加到下一個日志。

1.4.6.4  重新配置FileBeat
  1. 修改filebeat.yml,並添加以下內容

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /var/kafka/log/server.log.*

  multiline.pattern: '^\['

  multiline.negate: true

  multiline.match: after

 

output.elasticsearch:

    hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"]

  1. 修改「注冊表」/data.json,將server.log.2020-09-10對應的offset設置為0

cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeat

vim data.json

  1. 刪除之前創建的文檔

// 刪除指定文件的文檔

POST /filebeat-7.6.1-2020.05.29-000001/_delete_by_query

{

    "query": {

        "match": {

            "log.file.path": "/var/kafka/log/server.log.2020-09-10"

        }

    }

}

 

  1. 重新啟動FileBeat

./filebeat -e

 

 

 

1.5  FileBeat是如何工作的

FileBeat主要由input和harvesters(收割機)組成。這兩個組件協同工作,並將數據發送到指定的輸出。

1.5.1  input和harvester

1.5.1.1  inputs(輸入)

input是負責管理Harvesters和查找所有要讀取的文件的組件。如果輸入類型是 log,input組件會查找磁盤上與路徑描述的所有文件,並為每個文件啟動一個Harvester,每個輸入都獨立地運行。

1.5.1.2  Harvesters(收割機)

Harvesters負責讀取單個文件的內容,它負責打開/關閉文件,並逐行讀取每個文件的內容,並將讀取到的內容發送給輸出,每個文件都會啟動一個Harvester。但Harvester運行時,文件將處於打開狀態。如果文件在讀取時,被移除或者重命名,FileBeat將繼續讀取該文件。

1.5.2  FileBeats如何保持文件狀態

FileBeat保存每個文件的狀態,並定時將狀態信息保存在磁盤的「注冊表」文件中,該狀態記錄Harvester讀取的最后一次偏移量,並確保發送所有的日志數據。如果輸出(Elasticsearch或者Logstash)無法訪問,FileBeat會記錄成功發送的最后一行,並在輸出(Elasticsearch或者Logstash)可用時,繼續讀取文件發送數據。在運行FileBeat時,每個input的狀態信息也會保存在內存中,重新啟動FileBeat時,會從「注冊表」文件中讀取數據來重新構建狀態。

 

在/export/server/es/filebeat-7.6.1-linux-x86_64/data目錄中有一個Registry文件夾,里面有一個data.json,該文件中記錄了Harvester讀取日志的offset。

 

 

 

 

2.  Logstash

2.1  簡介

Logstash是一個開源的數據采集引擎。它可以動態地將不同來源的數據統一采集,並按照指定的數據格式進行處理后,將數據加載到其他的目的地。最開始,Logstash主要是針對日志采集,但后來Logstash開發了大量豐富的插件,所以,它可以做更多的海量數據的采集。

它可以處理各種類型的日志數據,例如:Apache的web log、Java的log4j日志數據,或者是系統、網絡、防火牆的日志等等。它也可以很容易的和Elastic Stack的Beats組件整合,也可以很方便的和關系型數據庫、NoSQL數據庫、Kafka、RabbitMQ等整合。

 

 

 

 

2.1.1  經典架構

 

 

 

 

2.1.2  對比Flume

  1. Apache Flume是一個通用型的數據采集平台,它通過配置source、channel、sink來實現數據的采集,支持的平台也非常多。而Logstash結合Elastic Stack的其他組件配合使用,開發、應用都會簡單很多
  2. Logstash比較關注數據的預處理,而Flume跟偏重數據的傳輸,幾乎沒有太多的數據解析預處理,僅僅是數據的產生,封裝成Event然后傳輸。

2.1.3  對比FileBeat

l logstash是jvm跑的,資源消耗比較大

l 而FileBeat是基於golang編寫的,功能較少但資源消耗也比較小,更輕量級

l logstash 和filebeat都具有日志收集功能,Filebeat更輕量,占用資源更少

l logstash 具有filter功能,能過濾分析日志

l 一般結構都是filebeat采集日志,然后發送到消息隊列,redis,kafka中然后logstash去獲取,利用filter功能過濾分析,然后存儲到elasticsearch中

2.2  安裝Logstash

  1. 切換到itcast用戶
  2. 下載Logstash

https://www.elastic.co/cn/downloads/past-releases/logstash-7-6-1

  1. 解壓Logstash到指定目錄
  2. 運行測試

cd /export/server/es/logstash-7.6.1/

bin/logstash -e 'input { stdin { } } output { stdout {} }'

等待一會,讓Logstash啟動完畢。

Sending Logstash logs to /export/server/es/logstash-7.6.1/logs which is now configured via log4j2.properties

[2020-05-28T16:31:44,159][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified

[2020-05-28T16:31:44,264][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.6.1"}

[2020-05-28T16:31:45,631][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values

[2020-05-28T16:31:46,532][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.

[2020-05-28T16:31:46,560][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x3ccbc15b run>"}

[2020-05-28T16:31:47,268][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}

The stdin plugin is now waiting for input:

[2020-05-28T16:31:47,348][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

[2020-05-28T16:31:47,550][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

然后,隨便在控制台中輸入內容,等待Logstash的輸出。

{

"host" => "node1.itcast.cn",

"message" => "hello world",

"@version" => "1",

"@timestamp" => 2020-05-28T08:32:31.007Z

}

 

ps:

-e選項表示,直接把配置放在命令中,這樣可以有效快速進行測試

2.3  采集Apache Web服務器日志

2.3.1  需求

Apache的Web Server會產生大量日志,當我們想要對這些日志檢索分析。就需要先把這些日志導入到Elasticsearch中。此處,我們就可以使用Logstash來實現日志的采集。

 

打開這個文件,如下圖所示。我們發現,是一個純文本格式的日志。如下圖所示:

 

 

 

 

這個日志其實由一個個的字段拼接而成,參考以下表格。

字段名

說明

client IP

瀏覽器端IP

timestamp

請求的時間戳

method

請求方式(GET/POST)

uri

請求的鏈接地址

status

服務器端響應狀態

length

響應的數據長度

reference

從哪個URL跳轉而來

browser

瀏覽器

 

因為最終我們需要將這些日志數據存儲在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一個大文本存儲所有的消息,而是需要將字段一個個的保存在Elasticsearch中。所以,我們需要在Logstash中,提前將數據解析好,將日志文本行解析成一個個的字段,然后再將字段保存到Elasticsearch中。

2.3.2  准備日志數據

將Apache服務器日志上傳到 /export/server/es/data/apache/ 目錄

 mkdir -p /export/server/es/data/apache/

2.3.3  使用FileBeats將日志發送到Logstash

在使用Logstash進行數據解析之前,我們需要使用FileBeat將采集到的數據發送到Logstash。之前,我們使用的FileBeat是通過FileBeat的Harvester組件監控日志文件,然后將日志以一定的格式保存到Elasticsearch中,而現在我們需要配置FileBeats將數據發送到Logstash。FileBeat這一端配置以下即可:

#----------------------------- Logstash output ---------------------------------

#output.logstash:

  # Boolean flag to enable or disable the output module.

  #enabled: true

 

  # The Logstash hosts

  #hosts: ["localhost:5044"]

 

hosts配置的是Logstash監聽的IP地址/機器名以及端口號。

114.113.220.255

准備FileBeat配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64

vim filebeat-logstash.yml

因為Apache的web log日志都是以IP地址開頭的,所以我們需要修改下匹配字段。

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /var/apache/log/access.*

  multiline.pattern: '^\d+\.\d+\.\d+\.\d+ '

  multiline.negate: true

  multiline.match: after

 

output.logstash:

  enabled: true

  hosts: ["node1.itcast.cn:5044"]

 

啟動FileBeat,並指定使用新的配置文件

./filebeat -e -c filebeat-logstash.yml

 

FileBeat將嘗試建立與Logstash監聽的IP和端口號進行連接。但此時,我們並沒有開啟並配置Logstash,所以FileBeat是無法連接到Logstash的。

2020-06-01T11:28:47.585+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused

2.3.4  配置Logstash接收FileBeat數據並打印

Logstash的配置文件和FileBeat類似,它也需要有一個input、和output。基本格式如下:

# #號表示添加注釋

# input表示要接收的數據

input {

}

 

# file表示對接收到的數據進行過濾處理

filter {

 

}

 

# output表示將數據輸出到其他位置

output {

}

 

配置從FileBeat接收數據

cd /export/server/es/logstash-7.6.1/config

vim filebeat-print.conf

input {

  beats {

    port => 5044

  }

}

 

output {

  stdout {

    codec => rubydebug

  }

}

 

測試logstash配置是否正確

bin/logstash -f config/filebeat-print.conf --config.test_and_exit

[2020-06-01T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

啟動logstash

bin/logstash -f config/filebeat-print.conf --config.reload.automatic

reload.automatic:修改配置文件時自動重新加載

測試

創建一個access.log.1文件,使用cat test >> access.log.1往日志文件中追加內容。

test文件中只保存一條日志:

[root@node1 log]# cat test

235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

 

當我們啟動Logstash之后,就可以發現Logstash會打印出來從FileBeat接收到的數據:

{

           "log" => {

          "file" => {

            "path" => "/var/apache/log/access.log.1"

        },

        "offset" => 825

    },

         "input" => {

        "type" => "log"

    },

         "agent" => {

        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

             "version" => "7.6.1",

                "type" => "filebeat",

                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

            "hostname" => "node1.itcast.cn"

    },

    "@timestamp" => 2020-06-01T09:07:55.236Z,

           "ecs" => {

        "version" => "1.4.0"

    },

          "host" => {

        "name" => "node1.itcast.cn"

    },

          "tags" => [

        [0] "beats_input_codec_plain_applied"

    ],

       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

      "@version" => "1"

}

2.3.5  Logstash輸出數據到Elasticsearch

通過控制台,我們發現Logstash input接收到的數據沒有經過任何處理就發送給了output組件。而其實我們需要將數據輸出到Elasticsearch。所以,我們修改Logstash的output配置。配置輸出Elasticsearch只需要配置以下就可以了:

output {

    elasticsearch {

        hosts => [ "localhost:9200" ]

    }}

 

操作步驟:

  1. 重新拷貝一份配置文件

cp filebeat-print.conf filebeat-es.conf

  1. 將output修改為Elasticsearch

input {

  beats {

    port => 5044

  }

}

 

output {

 elasticsearch {

   hosts => [ "node1.itcast.cn:9200","node2.itcast.cn:9200","node3.itcast.cn:9200"]

 }

}

  1. 重新啟動Logstash

bin/logstash -f config/filebeat-es.conf --config.reload.automatic

  1. 追加一條日志到監控的文件中,並查看Elasticsearch中的索引、文檔

cat test >> access.log.1

// 查看索引數據

GET /_cat/indices?v

我們在Elasticsearch中發現一個以logstash開頭的索引。

    {

        "health": "green",

        "status": "open",

        "index": "logstash-2020.06.01-000001",

        "uuid": "147Uwl1LRb-HMFERUyNEBw",

        "pri": "1",

        "rep": "1",

        "docs.count": "2",

        "docs.deleted": "0",

        "store.size": "44.8kb",

        "pri.store.size": "22.4kb"

    }

// 查看索引庫的數據

GET /logstash-2020.06.01-000001/_search?format=txt

{

    "from": 0,

    "size": 1

}

我們可以獲取到以下數據:

                    "@timestamp": "2020-06-01T09:38:00.402Z",

                    "tags": [

                        "beats_input_codec_plain_applied"

                    ],

                    "host": {

                        "name": "node1.itcast.cn"

                    },

                    "@version": "1",

                    "log": {

                        "file": {

                            "path": "/var/apache/log/access.log.1"

                        },

                        "offset": 1343

                    },

                    "agent": {

                        "version": "7.6.1",

                        "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                        "hostname": "node1.itcast.cn",

                        "type": "filebeat"

                    },

                    "input": {

                        "type": "log"

                    },

                    "message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

                    "ecs": {

                        "version": "1.4.0"

                    }

 

從輸出返回結果,我們可以看到,日志確實已經保存到了Elasticsearch中,而且我們看到消息數據是封裝在名為message中的,其他的數據也封裝在一個個的字段中。我們其實更想要把消息解析成一個個的字段。例如:IP字段、時間、請求方式、請求URL、響應結果,這樣。

2.3.6  Logstash過濾器

在Logstash中可以配置過濾器Filter對采集到的數據進行中間處理,在Logstash中,有大量的插件供我們使用。參考官網:

https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.html

此處,我們重點來講解Grok插件。

2.3.6.1  查看Logstash已經安裝的插件

bin/logstash-plugin list

2.3.6.2  Grok插件

Grok是一種將非結構化日志解析為結構化的插件。這個工具非常適合用來解析系統日志、Web服務器日志、MySQL或者是任意其他的日志格式。

Grok官網:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-grok.html

2.3.6.3  Grok語法

Grok是通過模式匹配的方式來識別日志中的數據,可以把Grok插件簡單理解為升級版本的正則表達式。它擁有更多的模式,默認,Logstash擁有120個模式。如果這些模式不滿足我們解析日志的需求,我們可以直接使用正則表達式來進行匹配。

官網:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

 

grok模式的語法是:%{SYNTAX:SEMANTIC}

SYNTAX指的是Grok模式名稱,SEMANTIC是給模式匹配到的文本字段名。例如:

%{NUMBER:duration} %{IP:client}

duration表示:匹配一個數字,client表示匹配一個IP地址。

默認在Grok中,所有匹配到的的數據類型都是字符串,如果要轉換成int類型(目前只支持int和float),可以這樣:%{NUMBER:duration:int} %{IP:client}

 

以下是常用的Grok模式:

NUMBER

匹配數字(包含:小數)

INT

匹配整形數字

POSINT

匹配正整數

WORD

匹配單詞

DATA

匹配所有字符

IP

匹配IP地址

PATH

匹配路徑

2.3.6.4  用法

 

 

 

 

    filter {

      grok {

        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

      }

    }

2.3.7  匹配日志中的IP、日期並打印

235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

我們使用IP就可以把前面的IP字段匹配出來,使用HTTPDATE可以將后面的日期匹配出來。

 

配置Grok過濾插件

  1. 配置Logstash

input {

    beats {

        port => 5044

    }

}

 

filter {

    grok {

        match => {

            "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\]"

        }

    }   

}

 

output {

    stdout {

        codec => rubydebug

    }

}

  1. 啟動Logstash

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

{

           "log" => {

        "offset" => 1861,

          "file" => {

            "path" => "/var/apache/log/access.log.1"

        }

    },

         "input" => {

        "type" => "log"

    },

          "tags" => [

        [0] "beats_input_codec_plain_applied"

    ],

          "date" => "15/Apr/2015:00:27:19 +0849",

           "ecs" => {

        "version" => "1.4.0"

    },

    "@timestamp" => 2020-06-01T11:02:05.809Z,

       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

          "host" => {

        "name" => "node1.itcast.cn"

    },

            "ip" => "235.9.200.242",

         "agent" => {

            "hostname" => "node1.itcast.cn",

             "version" => "7.6.1",

        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                "type" => "filebeat"

    },

      "@version" => "1"

}

 

我們看到,經過Grok過濾器插件處理之后,我們已經獲取到了ip和date兩個字段。接下來,我們就可以繼續解析其他的字段。

2.3.8  解析所有字段

將日志解析成以下字段:

字段名

說明

client IP

瀏覽器端IP

timestamp

請求的時間戳

method

請求方式(GET/POST)

uri

請求的鏈接地址

status

服務器端響應狀態

length

響應的數據長度

reference

從哪個URL跳轉而來

browser

瀏覽器

 

  1. 修改Logstash配置文件

input {

    beats {

        port => 5044

    }

}

 

filter {

    grok {

        match => {

            "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status} %{INT:length} \"%{DATA:reference}\" \"%{DATA:browser}\""

        }

    }   

}

 

output {

    stdout {

        codec => rubydebug

    }

}

  1. 測試並啟動Logstash

我們可以看到,8個字段都已經成功解析。

{

     "reference" => "www.baidu.com",

      "@version" => "1",

           "ecs" => {

        "version" => "1.4.0"

    },

    "@timestamp" => 2020-06-02T03:30:10.048Z,

            "ip" => "235.9.200.241",

        "method" => "POST",

           "uri" => "/itcast.cn/bigdata.html",

         "agent" => {

                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

        "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972",

             "version" => "7.6.1",

            "hostname" => "node1.itcast.cn",

                "type" => "filebeat"

    },

        "length" => "45",

        "status" => "200",

           "log" => {

          "file" => {

            "path" => "/var/apache/log/access.log"

        },

        "offset" => 1

    },

         "input" => {

        "type" => "log"

    },

          "host" => {

        "name" => "node1.itcast.cn"

    },

          "tags" => [

        [0] "beats_input_codec_plain_applied"

    ],

       "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",

          "date" => "15/Apr/2015:00:27:19 +0849",

       "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \"POST /itcast.cn/bigdata.html HTTP/1.1\" 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\""

}

 

2.3.9  將數據輸出到Elasticsearch

到目前為止,我們已經通過了Grok Filter可以將日志消息解析成一個一個的字段,那現在我們需要將這些字段保存到Elasticsearch中。我們看到了Logstash的輸出中,有大量的字段,但如果我們只需要保存我們需要的8個,該如何處理呢?而且,如果我們需要將日期的格式進行轉換,我們又該如何處理呢?

2.3.9.1  過濾出來需要的字段

要過濾出來我們需要的字段。我們需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以對字段進行重命名、刪除、替換或者修改結構。

 

 

 

 

官方文檔:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-mutate.html

例如,mutate插件可以支持以下常用操作

 

 

 

 

 

配置:

注意:此處為了方便進行類型的處理,將status、length指定為int類型。

input {

    beats {

        port => 5044

    }

}

 

filter {

    grok {

        match => {

            "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\""

        }

    }

    mutate {

        enable_metric => "false"

        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]

    }

}

 

output {

    stdout {

        codec => rubydebug

    }

}

2.3.9.2  轉換日期格式

要將日期格式進行轉換,我們可以使用Date插件來實現。該插件專門用來解析字段中的日期,官方說明文檔:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-date.html

 

用法如下:

 

 

 

 

 

將date字段轉換為「年月日 時分秒」格式。默認字段經過date插件處理后,會輸出到@timestamp字段,所以,我們可以通過修改target屬性來重新定義輸出字段。

 

Logstash配置修改為如下:

input {

    beats {

        port => 5044

    }

}

 

filter {

    grok {

        match => {

            "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\""

        }

    }

    mutate {

        enable_metric => "false"

        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]

    }

    date {

        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]

        target => "date"

    }

}

 

output {

    stdout {

        codec => rubydebug

    }

}

 

啟動Logstash:

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

 

{

       "status" => "200",

    "reference" => "www.baidu.com",

       "method" => "POST",

      "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",

           "ip" => "235.9.200.241",

       "length" => "45",

          "uri" => "/itcast.cn/bigdata.html",

         "date" => 2015-04-14T15:38:19.000Z

}

2.3.9.3  輸出到Elasticsearch指定索引

我們可以通過    

elasticsearch {

        hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"]

        index => "xxx"

}

index來指定索引名稱,默認輸出的index名稱為:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用時間格式化,filter的輸出必須包含 @timestamp字段,否則將無法解析日期。

 

input {

    beats {

        port => 5044

    }

}

 

filter {

    grok {

        match => {

            "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\""

        }

    }

    mutate {

        enable_metric => "false"

        remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]

    }

    date {

        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]

        target => "date"

    }

}

 

output {

    stdout {

        codec => rubydebug

    }

    elasticsearch {

        hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"]

        index => "apache_web_log_%{+YYYY-MM}"

    }

}

 

 

啟動Logstash

bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit

bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic

 

注意:

l index名稱中,不能出現大寫字符

3.  Kibana

3.1  簡介

 

 

 

 

通過上面的這張圖就可以看到,Kibana可以用來展示豐富的圖表。

l Kibana是一個開源的數據分析和可視化平台,使用Kibana可以用來搜索Elasticsearch中的數據,構建漂亮的可視化圖形、以及制作一些好看的儀表盤

l Kibana是用來管理Elastic stack組件的可視化平台。例如:使用Kibana可以進行一些安全設置、用戶角色設置、對Elasticsearch進行快照等等

l Kibana提供統一的訪問入口,不管是日志分析、還是查找文檔,Kibana提供了一個使用這些功能的統一訪問入口

l Kibana使用的是Elasticsearch數據源,Elasticsearch是存儲和處理數據的引擎,而Kibana就是基於Elasticsearch之上的可視化平台

 

 

 

 

主要功能:

l 探索和查詢Elasticsearch中的數據

 

 

 

 

l 可視化與分析

 

 

 

 

3.2  安裝Kibana

在Linux下安裝Kibana,可以使用Elastic stack提供 tar.gz壓縮包。官方下載地址:

https://www.elastic.co/cn/downloads/past-releases/kibana-7-6-1

 

  1. 解壓Kibana gz壓縮包

tar -xzf kibana-7.6.1-linux-x86_64.tar.gz

  1. 進入到Kibana目錄

cd kibana-7.6.1-linux-x86_64/

  1. 配置Kibana: config/kibana.yml

server.host: "node1.itcast.cn"

# The Kibana server's name.  This is used for display purposes.

server.name: "itcast-kibana"

 

# The URLs of the Elasticsearch instances to use for all your queries.

elasticsearch.hosts: ["http://node1.itcast.cn:9200","http://node2.itcast.cn:9200","http://node3.itcast.cn:9200"]

  1. 運行Kibana

./bin/kibana

3.2.1  查看Kibana狀態

輸入以下網址,可以查看到Kibana的運行狀態:

http://node1.itcast.cn:5601/status

 

 

 

 

3.2.2  查看Elasticsearch的狀態

點擊

 

 

 按鈕,再點擊 「Index Management」,可以查看到Elasticsearch集群中的索引狀態。

 

 

 

 

 

點擊索引的名字,可以進一步查看索引更多的信息。

 

 

 

 

點擊「Manage」按鈕,還可以用來管理索引。

 

 

 

 

3.3  添加Elasticsearch數據源

3.3.1  Kibana索引模式

在開始使用Kibana之前,我們需要指定想要對哪些Elasticsearch索引進行處理、分析。在Kibana中,可以通過定義索引模式(Index Patterns)來對應匹配Elasticsearch索引。在第一次訪問Kibana的時候,系統會提示我們定義一個索引模式。或者我們可以通過點擊按鈕,再點擊Kibana下方的Index Patterns,來創建索引模式。參考下圖:

 

 

 

 

 

  1. 定義索引模式,用於匹配哪些Elasticsearch中的索引。點擊「Next step」

 

 

 

 

  1. 選擇用於進行時間過濾的字段

 

 

 

 

  1. 點擊「Create Index Pattern」按鈕,創建索引模式。創建索引模式成功后,可以看到顯示了該索引模式對應的字段。里面描述了哪些可以用於搜索、哪些可以用來進行聚合計算等。

 

 

 

 

3.4  探索數據(Discovery)

通過Kibana中的Discovery組件,我們可以快速地進行數據的檢索、查詢。

3.4.1  使用探索數據功能

點擊按鈕可以打開Discovery頁面。

 

 

 

 

我們發現沒有展示任何的數據。但我們之前已經把數據導入到Elasticsearch中了。

 

 

 

 

Kibana提示,讓我們擴大我們的查詢的時間范圍。

 

 

 

 

默認Kibana是展示最近15分鍾的數據。我們把時間范圍調得更長一些,就可以看到數據了。

 

 

 

 

將時間范圍選擇為1年范圍內的,我們就可以查看到Elasticsearch中的數據了。

 

 

 

 

3.4.2  導入更多的Apache Web日志數據

  1. 將資料中的 access.log 文件上傳到Linux
  2. 將access.log移動到/var/apache/log,並重命名為access.log.2

mv access.log /var/apache/log/access.log.2

  1. 啟動FileBeat

./filebeat -e -c filebeat-logstash.yml

  1. 啟動Logstash

bin/logstash -f config/filebeat-es.conf --config.reload.automatic

3.4.3  基於時間過濾查詢

3.4.3.1  選擇時間范圍

 

 

 

 

3.4.3.2  指定查詢某天的數據

查詢2020年5月6日的所有日志數據。

 

 

 

 

3.4.3.3  從直方圖

 

上選擇日期更細粒度范圍

如果要選擇查看某一天的日志,上面這種方式會有一些麻煩,我們有更快更容易的方式。

 

 

 

 

 

 

 

 

 

 

3.4.4  使用Kibana搜索數據

在Kibana的Discovery組件中,可以在查詢欄中輸入搜索條件。默認情況下,可以使用Kibana內置的標准查詢語言,來進行快速查詢。還有一種是遺留的基於Lucene的查詢語法目前暫時可用,這種查詢語法也可以使用基於JSON的Elasticsearch DSL也是可用的。當我們在Discovery搜索數據時,對應的直方圖、文檔列表都會隨即更新。默認情況下,優先展示最新的文檔,按照時間倒序排序的。

3.4.4.1  Kibana查詢語言(KQL)

在7.0中,Kibana上線了新的查詢語言。這種語言簡潔、易用,有利於快速查詢。

查詢語法:

「字段:值」,如果值是字符串,可以用雙引號括起來。

查詢包含zhihu的請求

*zhihu*

查詢頁面不存在的請求

status : 404

查詢請求成功和不存在的請求

status: (404 or 200)

查詢方式為POST請求,並請求成功的日志

status: 200 and method: post

查詢方式為GET成功的請求,並且響應數據大於512的日志

status: 200 and method: get and length > 512

查詢請求成功的且URL為「/itcast.cn開頭的日志

uri: "\/itcast.cn\/*"

注意:因為/為特殊字符,需要使用反斜杠進行轉義

3.4.4.2  過濾字段

Kibana的Discovery組件提供各種各樣的篩選器,這樣可以篩選出來我們關注的數據上。例如:我們只想查詢404的請求URI。

 

 

 

 

 

 

指定過濾出來404以及請求的URI、從哪兒跳轉來的日志

 

 

 

 

 

 

將查詢保存下來,方便下次直接查看

 

 

 

 

 

 

下次直接點擊Open就可以直接打開之前保存的日志了

 

 

 

 

 

3.5  數據可視化(Visualize)

Kibana中的Visualize可以基於Elasticsearch中的索引進行數據可視化,然后將這些可視化圖表添加到儀表盤中。

3.5.1  數據可視化的類型

l Lens

n 通過簡單地拖拽數據字段,快速構建基本的可視化

l 常用的可視化對象

n 線形圖(Line)、面積圖(Area)、條形圖(Bar):可以用這些帶X/Y坐標的圖形來進行不同分類的比較

n 餅圖(Pie):可以用餅圖來展示占比

n 數據表(Data Table):以數據表格的形式展示

n 指標(Metrics):以數字的方式展示

n 目標和進度:顯示帶有進度指標的數字

n 標簽雲/文字雲(Tag Cloud):以文字雲方式展示標簽,文字的大小與其重要性相關

l Timelion

n 從多個時間序列數據集來展示數據

l 地圖

n 展示地理位置數據

l 熱圖

n 在矩陣的單元格展示數據

 

 

 

 

 

l 儀表盤工具

n Markdown部件:顯示一些MD格式的說明

n 控件:在儀表盤中添加一些可以用來交互的組件

l Vega

3.5.2  以餅圖展示404與200的占比

效果圖:

 

 

 

 

 

操作步驟:

  1. 創建可視化

 

 

 

 

 

  1. 選擇要進行可視化圖形類型,此處我們選擇Pie(餅圖類型)

 

 

 

 

 

  1. 選擇數據源

 

 

 

 

 

  1. 添加分桶、分片(還記得嗎?我們在Elasticsearch進行分組聚合都是以分桶方式進行的,可以把它理解為分組)

 

 

 

 

 

  1. 配置分桶以及指標計算方式

 

 

 

 

 

  1. 點擊藍色播放按鈕執行。

 

 

 

 

 

  1. 保存圖形(取名為:apache_log@404_200)

3.5.3  以條形圖方式展示2020年5月每日請求數

效果如下:

 

 

 

 

 

 

開發步驟:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

我們還可以修改圖形的樣式,例如:以曲線、面積圖的方式展示。

 

 

 

 

 

 

3.5.4  以TSVB可視化不同訪問來源的數據

TSVB是一個基於時間序列的數據可視化工具,它可以使用Elasticsearch聚合的所有功能。使用TSVB,我們可以輕松地完成任意聚合方式來展示復雜的數據。它可以讓我們快速制作效果的圖表:

  1. 基於時間序列的圖形展示

 

 

 

 

 

  1. 展示指標數據

 

 

 

 

 

  1. TopN

 

 

 

 

 

  1. 類似油量表的展示

 

 

 

 

 

  1. Markdown自定義數據展示

 

 

 

 

 

  1. 以表格方式展示數據

 

 

 

 

 

操作步驟:

  1. 創建TSVB可視化對象

 

 

 

 

 

  1. 配置Time Series數據源分組條件

 

 

 

 

 

 

 

 

 

 

  1. 配置Metric

 

 

 

 

 

 

  1. TopN

 

 

 

 

 

3.5.5  制作用戶選擇請求方式、響應字節大小控制組件

3.5.5.1  控制組件

在Kibana中,我們可以使用控件來控制圖表的展示。例如:提供一個下列列表,供查看圖表的用戶只展示比較關注的數據。我們可以添加兩個類型的控制組件:

  1. 選項列表

l 根據一個或多個指定選項來篩選內容。例如:我們先篩選某個城市的數據,就可以通過選項列表來選擇該城市

  1. 范圍選擇滑塊

l 篩選出來指定范圍的數據。例如:我們篩選某個價格區間的商品等。

 

3.5.5.2  Kibana開發

 

 

 

 

 

 

 

 

 

 

 

 

 

3.6  制作Dashboard

接下來,我們把前面的幾個圖形放到一個看板中。這樣,我們就可以在一個看板中,瀏覽各類數據了。

 

 

 

 

 

  1. 點擊第三個組件圖標,並創建一個新的Dashboard。

 

 

 

 

 

  1. 點擊Edit編輯Dashboard。

 

 

 

 

 

  1. 依次添加我們之前制作好的圖表。

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM