目錄
1.ELK 日志收集系統之Supervisord安裝配置
2.ELK 日志收集系統之Supervisord啟動elasticsearch-head
3.ELK 日志收集系統之Kafka配置
4.ELK 架構部署之 Logstash 配置
5.ELK 架構部署之supervisord方式啟動Kibana
6.ELK 日志收集系統之Filebeat配置
1.EFK 日志收集系統之Supervisord安裝配置
0x01 Supervisord 簡單介紹
Supervisor是一個客戶/服務器系統,它可以在類Unix系統中管理控制大量進程。Supervisor使用python開發,有多年歷史,目前很多生產環境下的服務器都在使用Supervisor。
Supervisor的服務器端稱為supervisord,主要負責在啟動自身時啟動管理的子進程,響應客戶端的命令,重啟崩潰或退出的子進程,記錄子進程stdout和stderr輸出,生成和處理子進程生命周期中的事件。可以在一個配置文件中配置相關參數,包括Supervisord自身的狀態,其管理的各個子進程的相關屬性。配置文件一般位於/etc/supervisord.conf。
Supervisor的客戶端稱為supervisorctl,它提供了一個類shell的接口(即命令行)來使用supervisord服務端提供的功能。通過supervisorctl,用戶可以連接到supervisord服務器進程,獲得服務器進程控制的子進程的狀態,啟動和停止子進程,獲得正在運行的進程列表。客戶端通過Unix域套接字或者TCP套接字與服務端進行通信,服務器端具有身份憑證認證機制,可以有效提升安全性。當客戶端和服務器位於同一台機器上時,客戶端與服務器共用同一個配置文件/etc/supervisord.conf,通過不同標簽來區分兩者的配置。
0x02 Supervisord 安裝及主要目錄介紹
安裝方式這里不多做介紹,可參考官網:Supervisor
我這里為了方便,直接 yum
倉庫安裝
# yum -y install supervisor
使用 yum
安裝的 supervisor
,主要關注三個文件:
/etc/supervisord.conf
:supervisord 的主配置文件;/etc/supervisord.d
:用於存放管理進程的配置;/var/log/supervisor
:supervisord 的日志存放目錄,存放關於supervisord的所有日志文件。
0x03 Supervisord 的配置
開啟 Supervisord 的 Web 管理界面,在主配置文件(/etc/supervisord.conf
)下找到如下配置,取消注釋,配置配置訪問IP、端口號(默認端口:9001)、登入賬號、密碼。雖然內網,密碼建議復雜。
[inet_http_server] ; inet (TCP) server disabled by default
port=192.168.7.3:9001 ; (ip_address:port specifier, *:port for all iface)
username=admin ; (default is no username (open server))
password=admin ; (default is no password (open server))
重啟supervisord
systemctl restart supervisord
瀏覽器訪問:http://192.168.7.3:9001
supervisord 配置暫時到此結束,后面會用到supervisord管理如下進程。
進程名 | 介紹 |
---|---|
EFK-elasticsearch-head | elasticsearch 插件 |
EFK-kafka-broker0 | Kafka實例 |
EFK-kafka-broker1 | Kafka實例 |
EFK-kafka-broker2 | Kafka實例 |
EFK-kafka-consumer-mysql-1 | kafka消費者消費MySQL隊列信息 |
EFK-kafka-consumer-mysql-2 | kafka消費者消費MySQL隊列信息 |
EFK-kafka-consumer-nginx-1 | kafka消費者消費nginx隊列信息 |
EFK-kafka-consumer-nginx-2 | kafka消費者消費nginx隊列信息 |
EFK-kafka-consumer-php-1 | kafka消費者消費php隊列信息 |
EFK-kafka-consumer-php-2 | kafka消費者消費php隊列信息 |
EFK-kafka-consumer-php-1 | kafka消費者消費php隊列信息 |
EFK-kafka-consumer-php-2 | kafka消費者消費php隊列信息 |
EFK-kafka-consumer-system-1 | kafka消費者消費system隊列信息 |
EFK-kafka-consumer-system-2 | kafka消費者消費system隊列信息 |
EFK-kafka-manager | kafka的Web工具 |
EFK-kafka-zookeeper | kafka必須連接zookeeper |
EFK-kibana | kibana 進程 |
EFK-logstash-indexer-mysql | logstash轉發mysql消息 |
EFK-logstash-indexer-system | logstash轉發system消息 |
EFK-logstash-indexer-php | logstash轉發php消息 |
EFK-logstash-indexer-nginx | logstash轉發nginx消息 |
后期完成結果如下:
2.EFK 日志收集系統之Supervisord啟動elasticsearch-head
由於在前文中0x04 安裝 Elasticsearch,我們是在前台使用直接運行 npm run start
命令啟動的 elasticsearch-head,這樣有一點不好,開啟此前台應用程序的中端不能關閉。比如可以用 nohup
或&
讓程序在后台運行。但我們還是更偏愛於使用Web界面管理的后台程序,更加直觀明了。因此我們使用 supervisord
管理后台程序,話不多說,直接切入主題。
如果要使用 supervisord 管理后台進程,需要在 /etc/supervisord.d
目錄下,配置進程的相關信息,創建配置文件elasticsearch-head.ini
,我這邊的配置如下:
# 配置進程的名字
[program:EFK-elasticsearch-head]
# 命令運行的目錄
directory=/usr/local/elasticsearch-head
# 進程執行的命令
command=/usr/local/node-v10.15.1-linux-x64/bin/npm run start
# 是否隨supervisor啟動
autostart=true
# 子進程啟動多少秒之后,此時狀態如果是running,則我們認為啟動成功了。默認值為1
startsecs=1
# 標准錯誤日志目錄
stderr_logfile=/var/log/supervisor/elasticsearch-head_stderr.log
# 標准輸出日志目錄
stdout_logfile=/var/log/supervisor/elasticsearch-head_stdout.log
# 進程啟動的用戶
user=elasticsearch
# 把 stderr 重定向到 stdout,默認 false,即把標准錯誤日志寫入到標准輸出日志中,說白了就是將兩個日志文件合並到一個日志文件
redirect_stderr=true
# 標准輸出 stdout 日志文件大小 20MB,默認為50MB
stdout_logfile_maxbytes=20MB
# 標准輸出日志備份數量
stdout_logfile_backups=10
elasticsearch-head.ini
配置文件創建完畢后,可以在命令行中,運行命令:
# supervisorctl update
這條命令會加載 /etc/supervisord.d
目錄下的進程文件:
- 如果有新的進程文件,則會直接啟動該進程;
- 如果僅僅是舊的進程文件被更新了,則會停止該進程,再重新啟動該進程。
# supervisorctl status
EFK-elasticsearch-head RUNNING pid 4107, uptime 8:23:41
supervisord 還有如下幾條命令比較常用:
# 停止某一個進程
supervisorctl stop 進程名字
# 啟動某一個進程
supervisorctl start 進程名字
# 去除某一個進程
supervisorctl remove 進程名字
對於一個新加入的進程,我們需要先執行 supervisorctl update
后,才能在 supervosprctl status
中看到進程的狀態。
如果在命令終端觀察到進程為RUNNING
狀態,則可以瀏覽器管理該進程了,瀏覽器輸入:http://192.168.7.3:9001,如下圖,選擇 Tail -f
,則可以看到/var/log/supervisor/elasticsearch-head_stdout.log
日志中的內容,是不是很方便呢?
最后,我們要查看elasticsearch-head
的端口是否起來:
# netstat -anlp | grep 9100
tcp 0 0 0.0.0.0:9100 0.0.0.0:* LISTEN 7112/grunt
# lsof -i :9100
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
grunt 7112 elasticsearch 18u IPv4 8031920 0t0 TCP *:jetdirect (LISTEN)
瀏覽器訪問 :http://192.168.7.3:9100
3.EFK 日志收集系統之Kafka配置
0x01 Kafka簡單介紹
關於Kafka的具體介紹,可參考中文文檔:Kafka中文網站,以下摘抄於文檔中一些比較重要的概念。
Kafka是一種高性能、低延遲、具備日志存儲、備份和傳播功能的分布式文件系統。有如下三種特性:
- 可以讓你發布和訂閱流式的記錄
- 可以儲存流式的記錄,並且有較好的容錯性。
- 可以在流式記錄產生時就進行處理。
Kafka 可用於兩大類別的應用:
- 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue)
- 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)
Kafka 基本概念:
- Kafka作為一個集群,運行在一台或多台服務器上
- Kafka 通過
topic
對存儲的流數據進行分類 - 每條記錄中包含一個
key
、一個value
和一個timestamp
(時間戳) Topic
就是數據主題,是數據記錄發布的地方,可以用來區分業務系統。Kafka中的 Topic 總是多訂閱者模式,一個 topic 可以區分擁有一個一個或者多個消費者來訂閱它的數據- Kafka 集群保留所有發布的記錄—無論他們是否已被消費—並通過一個可配置的參數——保留期限來控制。舉個例子, 如果保留策略設置為2天,一條記錄發布后兩天內,可以隨時被消費,兩天過后這條記錄會被拋棄並釋放磁盤空間。
- 日志中的 partition(分區)有以下幾個用途。第一,當日志大小超過了單台服務器的限制,允許日志進行擴展。每個單獨的分區都必須受限於主機的文件限制,不過一個主題可能有多個分區,因此可以處理無限量的數據。第二,可以作為並行的單元集。
- 日志的分區partition (分布)在Kafka集群的服務器上。每個服務器在處理數據和請求時,共享這些分區。每一個分區都會在已配置的服務器上進行備份,確保容錯性。
- 每個分區都有一台 server 作為 “leader”,零台或者多台server作為 follwers 。leader server 處理一切對 partition (分區)的讀寫請求,而follwers只需被動的同步leader上的數據。當leader宕機了,followers 中的一台服務器會自動成為新的 leader。每台 server 都會成為某些分區的 leader 和某些分區的 follower,因此集群的負載是平衡的。
- 生產者可以將數據發布到所選擇的topic(主題)中。生產者負責將記錄分配到topic的哪一個 partition(分區)中。
- 消費者使用一個
消費組
名稱來進行標識,發布到topic中的每條記錄被分配給訂閱消費組中的一個消費者實例.消費者實例可以分布在多個進程中或者多個機器上。如果所有的消費者實例在同一消費組中,消息記錄會負載平衡到每一個消費者實例。如果所有的消費者實例在不同的消費組中,每條消息記錄會廣播到所有的消費者進程.。 - 通常情況下,每個 topic 都會有一些消費組,一個消費組對應一個"邏輯訂閱者"。一個消費組由許多消費者實例組成,便於擴展和容錯。
根據EFK日志采集系統的特點,我個人對使用kafka的理解,即怎樣解決日志在Kafka中做分類傳輸的問題:由於我們利用 Filebeat
采集到的日志會有很多不同的日志,比較常見的例如 Web服務器的Nginx的日志、php的日志,數據庫服務器MySQL的日志、慢查詢日志等,還有每一台服務器的系統日志。
所以,我們需要對不同的日志做分類傳到Kafka的中。在Filebeat 中,我們利用 fields
添加一個自定義字段,這里我們會定義一個key
為log_topic
,值根據采集的日志來定,例如我們采集的是是nginx的錯誤日志和訪問日志 /var/log/nginx/error.log
、 /var/log/nginx/access.log
,則可以將 log_topic
的值定義為 nginx
,然后在 filebeat中配置輸出為 output.kafka
,時,可以定義 topic
為 %{[fields.log_topic]}
,這樣就可以實現,將不同的日志分類存儲到 kafka
中。Kafka
中比較重要的一個概念Topic
,即數據主題。我們既然采集到了日志,並傳遞到Kafka中,在Kafka中也不能夠隨便存儲。所以我們根據 filebeat
中自定義的log_topic
字段,來定義Topic
。為了做橫向擴展,一個 topic 可以由多個 partition 組成,遇到瓶頸時,通過增加partition的數量來進行橫向擴容。但是對於每一個partition,需要啟用一個消費者,消費partition中的數據。即如下圖所示:
在上圖中,我們采集了system
、php
、mysql
、nginx
日志,Filebeat
將日志推送到 Kafka
中,並根據不同的日志存儲在不同的 Topic
中。在Kafka中會啟動3個broker
實例,分別為 broker0
、broker1
和broker2
。每個Topic
分為兩個partition
,並且一個Topic
對應一組消費者組,這組消費者組中每一個消費者對應消費一個partition
中的消息隊列。話不多說,開始實戰!
0x01 Kafka 配置之啟動 broker
在EFK之日志收集系統部署中,我們已將Kafka
的安裝包解壓至 /usr/local
目錄下,進入該目錄下,查看有哪些文件內容:
# pwd
/usr/local/kafka_2.11-2.1.0
# ls
bin config libs LICENSE logs NOTICE site-docs
# ls bin/
connect-distributed.sh kafka-console-producer.sh kafka-log-dirs.sh kafka-run-class.sh kafka-streams-application-reset.sh zookeeper-security-migration.sh
connect-standalone.sh kafka-consumer-groups.sh kafka-mirror-maker.sh kafka-topics.sh zookeeper-server-start.sh
kafka-acls.sh kafka-consumer-perf-test.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh zookeeper-server-stop.sh
kafka-broker-api-versions.sh kafka-delegation-tokens.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh zookeeper-shell.sh
kafka-configs.sh kafka-delete-records.sh kafka-reassign-partitions.sh kafka-server-start.sh trogdor.sh
kafka-console-consumer.sh kafka-dump-log.sh kafka-replica-verification.sh kafka-server-stop.sh windows
如上,在 /usr/local/kafka_2.11-2.1.0/bin
目錄下,可以看到有很多以.sh
結尾的 Shell
腳本,重點關注以下幾個腳本
zookeeper-server-start.sh # zookeeper服務啟動腳本
kafka-topics.sh # topic管理腳本,可創建、刪除、修改topic
kafka-console-consumer.sh # 消費者啟動腳本
kafka-server-start.sh # broker 實例啟動腳本
Kafka
要運行,首先要啟動 zookeeper
。zookeeper
是一個為分布式應用提供一致性服務的軟件
1.啟動 ZooKeeper
服務器
利用 /usr/local/kafka_2.11-2.1.0/bin
下的zookeeper-server-start.sh
腳本啟動 ZooKeeper
(當然我們肯定會有supervisord部署的)
# bin/zookeeper-server-start.sh config/zookeeper.properties
啟動服務后,可以查看 zookeeper
的 2181
端口是否啟動
# netstat -tnlp | grep 2181
tcp6 0 0 :::2181 :::* LISTEN 3850/java
# 查看連接數
# lsof -i:2181
- supervisord 方式部署
zookpeeper
在 /etc/supervisord.d
目錄下,新建 kafka-zookeeper.ini
配置文件,填入如下內容:
[program:EFK-kafka-zookeeper]
directory=/usr/local/kafka_2.11-2.1.0/bin
command=/usr/local/kafka_2.11-2.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
autostart=true
startsecs=1
stderr_logfile=/var/log/supervisor/kafka-zookeeper_stderr.log
stdout_logfile=/var/log/supervisor/kafka-zookeeper_stdout.log
user=root
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10
然后執行在命令行中執行:
# supervisorctl update
# supervisorctl status
supervisord 方式部署完畢。
0x03 啟動 broker 實例
腳本方式啟動
# bin/kafka-server-start.sh config/server.properties
這里有一點要注意,我們在開啟 broker 實例時,一般都不止一個broker,為了高可用,可定要多啟動幾個broker。而每個 broker 可能配置需求不一樣,所有會多次用到server.properties
配置文件,我們可以根據 broker的id來命令server.properties
文件,在broker中,broker的ID必須是唯一的。在server.properties
中配置。先講講server.properties
配置文件:
# broker 的ID,必須為整數,且在集群中唯一
broker.id=0
# broker的機架位置。 這將在機架感知副本分配中用於容錯。
broker.rack=efk
# broker監聽的端口,如果在同一台服務器上啟動多個broker,應啟用不同的端口
listeners=PLAINTEXT://192.168.7.3:9092
# 服務器用於從接收網絡請求並發送網絡響應的線程數
num.network.threads=3
# 服務器用於處理請求的線程數,可能包括磁盤I/O
num.io.threads=8
# 服務端用來處理socket連接的SO_SNDBUF緩沖大小。
socket.send.buffer.bytes=102400
# 服務端用來處理socket連接的SO_RCVBUFF緩沖大小。
socket.receive.buffer.bytes=102400
# socket請求的最大大小,這是為了防止server跑光內存,不能大於Java堆的大小。
socket.request.max.bytes=104857600
# 保存日志數據的目錄,如果未設置將使用log.dir的配置。可根據broker的id號配置
log.dirs=/tmp/kafka-logs-0
# 每個topic的默認日志分區數
num.partitions=1
# 每個數據目錄,用於啟動時日志恢復和關閉時刷新的線程數
num.recovery.threads.per.data.dir=1
# offset topic的副本數(設置的越大,可用性越高)。
offsets.topic.replication.factor=1
# 事務topic的副本數(設置的越大,可用性越高)。
transaction.state.log.replication.factor=1
# 覆蓋事務topic的min.insync.replicas配置
transaction.state.log.min.isr=1
# 日志刪除的時間閾值(小時為單位)
log.retention.hours=168
# 單個日志段文件最大大小
log.segment.bytes=1073741824
# 日志清理器檢查是否有日志符合刪除的頻率(以毫秒為單位)
log.retention.check.interval.ms=300000
# 連接到 zookeeper 的IP和端口號
zookeeper.connect=192.168.7.3:2181
# 與ZK server建立連接的超時時間,
zookeeper.connection.timeout.ms=6000
# 在執行第一次重新平衡之前,group協調器將等待更多consumer加入group的時間。
group.initial.rebalance.delay.ms=0
在 server.properties
的配置文件中,最核心的三個配置為:
- broker.id
- log.dirs
- zookeeper.connect
請確保 broker.id
的唯一性;log.dirs
的配置標准:kafka-logs-id號
,例如:broker.id
為 0
,則 log.dirs
可設置為 kafka-logs-0
,依次類推。這里我們需要創建3個broker,依次配置 3
個 broker
的 supervisord
進程文件,參考 broker.id=0
的配置即可。
注意:broker的配置文件中還有一個特別重要的參數 max.message.bytes
默認為 1000012
,即 1M
,生產者發送給broker消息的最大字節數。默認值是1000012,也就是1MB。生產者發送的消息超過該設置,會被broker拒絕接收,並且會收到broker的錯誤報告。
即在 Filebeat 中會收到如下報錯信息:
2019-03-05T09:39:33.611+0800 ERROR kafka/client.go:131 Dropping event: no topic could be selected
2019-03-05T09:39:33.611+0800 ERROR kafka/client.go:131 Dropping event: no topic could be selected
2019-03-05T09:39:33.611+0800 ERROR kafka/client.go:131 Dropping event: no topic could be selected
所以解決這個問題有兩種辦法:
- 第一:日志標准化,每個日志文件的大小定義不超過
1M
- 第二:增大``max.message.bytes`的數值
0x04 啟動 comsumer 實例
Kafka中的消費者會主動到broker中拉取指定topic
中指定patition
中的消息。
如果用腳本啟動消費者,如下:
# bin/kafka-console-consumer.sh --bootstrap-server 192.168..7.3:9092,192.168..7.3:9093,192.168..7.3:9094 --topic nginx --from-beginning
- bootstrap-server:為我們啟動的broker實例,多個實例用
,
隔開 - topic:消息主題
- from-beginning:從消息第一條消息開始消費
當然我們會更熱衷於用 supervisord 啟動消費者。此處只舉一個例子,后續的消費者參考此配置即可。
在前面創建topic時,我們的配置文件中 server.properties
配置的參數 num.partitions=1
,所以,每當我們自動創建一個topic的同時,也會為這個topic
創建一個 partition
。因此,對於每一個partition均要分配一個消費者。就拿一個消費system
日志的消費者舉例。
在 /etc/supervisord.d
中創建配置文件 kafka-consumer-system-1.ini
,文件中寫入如下配置:
[program:EFK-kafka-consumer-system-1]
directory=/usr/local/kafka_2.11-2.1.0/bin/
# bootstrap-server 指定broker實例
# topic 指定消費主題
# group 指定消費者歸屬於哪個消費者組,不用提前創建組名,直接指定組名,即創建消費者組
# partition 指定消費者消費 topic 中的哪個partition
# consumer.config 指定消費者的配置文件,每一個消費者一個單獨的配置文件
command=/usr/local/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094 --topic system --group system --partition 0 --consumer.config /usr/local/kafka_2.11-2.1.0/config/consumer-system-1.properties
autostart=true
startsecs=1
stderr_logfile=/var/log/supervisor/kafka-consumer-system-1_stderr.log
stdout_logfile=/var/log/supervisor/kafka-consumer-system-1_stdout.log
user=root
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10
接下來聊聊消費者的配置文件,默認放置於/usr/local/kafka_2.11-2.1.0/config
目錄下的consumer.properties
,為了實現標准化,對每一個消費者的配置文件命名規范,做一個限制:consumer-topic名字-第幾個消費者
。
bootstrap.servers=192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094
group.id=system
# 自動將偏移重置為最新偏移
auto.offset.reset=latest
# 會話超時時間
session.timeout.ms=15000
# 消費者在獲取更多記錄之前可以閑置的時間量
max.poll.interval.ms=60000
# 一次調用poll()時返回的最大記錄數。
max.poll.records=400
# 發出請求時傳遞給服務器的id字符串
client.id=system1
# 心跳間隔時間
heartbeat.interval.ms=5000
啟動消費者
# supervisorctl update
在 supervisord
的前端頁面,點擊 EFK-kafka-consumer-system-1
中的 Tail -f
,查看進程啟動的日志,當然,我們還沒有啟動生產者(Filebeat),此時當然沒有消息可以被消費啦!
4.EFK 架構部署之 Logstash 配置
0x01 Logstash 介紹
Logstash 由三個插件組成:
- INPUTS:輸入端
- FILTERS:過濾端
- OUTPUTS:輸出端
如下圖,信息源通過 INPUT
插件傳入 Logstash
,然后我們可以根據我們自己的需求,通過 Filter
插件將過濾出我們想要的日志,例如 message
中包含 error
的日志,則可以在這里配置。過濾完后,再輸入到 OUTPUT
插件中,選擇輸出的目的地。
一個 Logstash的 pipeline
至少需要 INPUT
和 OUTPUT
。
0x02 配置 log4j2.properties
rmp包
安裝的Logstash主要的文件目錄有兩個:
/usr/share/logstash
:logstash的安裝目錄/etc/logstash/
:logstash的主配置文件目錄
在 /usr/share/logstash
下,需要創建一個 config
目錄:
# mkdir -p /usr/share/logstash/config
# cp /etc/logstash/log4j2.properties /usr/share/logstash/config
# chown -R logstash:logstash /usr/share/logstash
如果此步驟沒有執行,在后面啟動 logstash
時,會有提示/usr/share/logstash/config
目錄下,缺少log4j2.properties
文件。
0x02 supervisord 方式部署
我們只需要配置INPUT
和OUTPUT
,FILTER
根據實際需求,匹配過濾。
在 /etc/logstash/conf.d
下創建 indexer-system.conf
文件,配置如下內容:
input {
kafka {
bootstrap_servers => "192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094"
group_id => "system"
topics => "system"
auto_offset_reset => "latest"
decorate_events => true
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.7.3:9200"]
# 根據kafka 中 topic 的名字創建索引,並以天為單位
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
如果仔細的話,發現我們只接受了 kafka
中 topic
為 system
的日志,並在ES中給system類型的日志創建了索引;而且我們給文件取的名字 indexer-system.conf
也包含一個 system
的名字。
因此,我們需要給每一個 topic
創建一個 indexer
文件,並配置不同的 topic
和 group_id
。
# pwd
/etc/logstash/conf.d
# ls
indexer-mysql.conf indexer-nginx.conf indexer-php.conf indexer-system.conf
如上,我給 topic
為mysql
、nginx
、php
、system
分別創建了一個 indexer
文件
然后,我們用 supervisord 啟動 indexer-system.conf
在 /etc/supervisord.d
目錄下,創建 logstash-indexer-system.ini
的配置文件,輸入一下內容
[program:EFK-logstash-indexer-system]
directory=/etc/logstash
# 啟動logstash時,指定indexer-system.conf配置文件,指定數據存儲目錄indexer-system
command=/usr/bin/logstash -f /etc/logstash/conf.d/indexer-system.conf --path.data=/tmp/indexer-system
startsecs=1
stderr_logfile=/var/log/supervisor/logstash-indexer-system_stderr.log
stdout_logfile=/var/log/supervisor/logstash-indexer-system_stdout.log
user=logstash
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10
啟動 logstash
# supervisorctl update
配置完畢!
注意:對於每一個logstash
的indexer
,我們都需要指定一個 indexer
的配置文件;並配置一個數據存放的目錄,不需要提前創建該目錄,但是,不同的 indexer
一定要存放在不同的目錄。
5.EFK 架構部署之supervisord方式啟動Kibana
由於kibana的啟動方式直接運行/usr/local/kibana-6.6.0-linux-x86_64/bin
下的 kibana
即可,不過多解釋,直接supervisord部署。
在 /etc/supervisord.d
目錄下,創建 kibana.ini
配置文件:
[program:EFK-kibana]
directory=/usr/local/kibana-6.6.0-linux-x86_64
command=/usr/local/kibana-6.6.0-linux-x86_64/bin/kibana
startsecs=1
stderr_logfile=/var/log/supervisor/kibana_stderr.log
stdout_logfile=/var/log/supervisor/kibana_stdout.log
user=root
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10
啟動 kibana
# supervisorctl update
瀏覽器訪問:http://http😕/192.168.7.3:5601
6.EFK 日志收集系統之Filebeat配置
Filebeat 是我們的日志采集端,即每台機器上都需要安裝 Filebeat,並配置不同的日志路徑。
在 Filebeat 的主配置文件 /etc/filebeat/filebeat.yml
下配置如下:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/boot.log
- /var/log/messages
- /var/log/secure
fields:
log_topic: system
server_ip: "192.168.7.3"
- type: log
enabled: true
paths:
- /var/log/mysqld.log
fields:
log_topic: mysql
server_ip: "192.168.7.3"
output.kafka:
enabled: true
hosts: ["192.168.7.3:9092","192.168.7.3:9093","192.168.7.3:9094"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
codec.json:
pretty: false
escape_html: true
metadata:
retry.max: 3
refresh_frequency: 10m
worker: 1
max_retries: 3
bulk_max_size: 2048
timeout: 30s
broker_timeout: 10s
channel_buffer_size: 256
keep_alive: 0
required_acks: 1
compression: gzip
compression_level: 4
max_message_bytes: 1000000
client_id: beats
close_older: 30m
force_close_files: true
#沒有新日志采集后多長時間關閉文件句柄,默認5分鍾,設置成1分鍾,加快文件句柄關閉;
close_inactive: 1m
#傳輸了3h后荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point;
close_timeout: 3h
##這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是采集過的文件描述在registry文件里永不清理,在運行一段時間后,registry會變大,可能會帶來問題。
clean_inactive: 72h
#設置了clean_inactive后就需要設置ignore_older,且要保證ignore_older < clean_inactive
ignore_older: 70h
# 限制 CPU和內存資源
max_procs: 1
queue.mem.events: 256
queue.mem.flush.min_events: 128
Filebeat默認是將采集的日志輸入到ES中的,所以要注釋掉/etc/filebeat/filebeat.yml
中的 output.elasticsearch
,在 Filebeat
中只允許有一個 output
。
啟動 Filebeat
# systemctl start filebeat
# systemctl enable filebeat
啟動 Filebeat 后,我們可以通過如下命令查看 Filebeat 的日志:
# tail -f /var/log/filebeat/filebeat
配置完畢!