ELK之日志收集系統配置


目錄

1.ELK 日志收集系統之Supervisord安裝配置
2.ELK 日志收集系統之Supervisord啟動elasticsearch-head
3.ELK 日志收集系統之Kafka配置
4.ELK 架構部署之 Logstash 配置
5.ELK 架構部署之supervisord方式啟動Kibana
6.ELK 日志收集系統之Filebeat配置


1.EFK 日志收集系統之Supervisord安裝配置

0x01 Supervisord 簡單介紹

      Supervisor是一個客戶/服務器系統,它可以在類Unix系統中管理控制大量進程。Supervisor使用python開發,有多年歷史,目前很多生產環境下的服務器都在使用Supervisor。

      Supervisor的服務器端稱為supervisord,主要負責在啟動自身時啟動管理的子進程,響應客戶端的命令,重啟崩潰或退出的子進程,記錄子進程stdout和stderr輸出,生成和處理子進程生命周期中的事件。可以在一個配置文件中配置相關參數,包括Supervisord自身的狀態,其管理的各個子進程的相關屬性。配置文件一般位於/etc/supervisord.conf。

      Supervisor的客戶端稱為supervisorctl,它提供了一個類shell的接口(即命令行)來使用supervisord服務端提供的功能。通過supervisorctl,用戶可以連接到supervisord服務器進程,獲得服務器進程控制的子進程的狀態,啟動和停止子進程,獲得正在運行的進程列表。客戶端通過Unix域套接字或者TCP套接字與服務端進行通信,服務器端具有身份憑證認證機制,可以有效提升安全性。當客戶端和服務器位於同一台機器上時,客戶端與服務器共用同一個配置文件/etc/supervisord.conf,通過不同標簽來區分兩者的配置。

0x02 Supervisord 安裝及主要目錄介紹

      安裝方式這里不多做介紹,可參考官網:Supervisor

我這里為了方便,直接 yum 倉庫安裝

# yum -y install supervisor

使用 yum 安裝的 supervisor,主要關注三個文件:

  • /etc/supervisord.conf:supervisord 的主配置文件;
  • /etc/supervisord.d:用於存放管理進程的配置;
  • /var/log/supervisor:supervisord 的日志存放目錄,存放關於supervisord的所有日志文件。

0x03 Supervisord 的配置

開啟 Supervisord 的 Web 管理界面,在主配置文件(/etc/supervisord.conf)下找到如下配置,取消注釋,配置配置訪問IP、端口號(默認端口:9001)、登入賬號、密碼。雖然內網,密碼建議復雜

[inet_http_server]         ; inet (TCP) server disabled by default
port=192.168.7.3:9001        ; (ip_address:port specifier, *:port for all iface)
username=admin              ; (default is no username (open server))
password=admin               ; (default is no password (open server))

重啟supervisord

systemctl restart supervisord 

瀏覽器訪問:http://192.168.7.3:9001

supervisord 配置暫時到此結束,后面會用到supervisord管理如下進程。

進程名 介紹
EFK-elasticsearch-head elasticsearch 插件
EFK-kafka-broker0 Kafka實例
EFK-kafka-broker1 Kafka實例
EFK-kafka-broker2 Kafka實例
EFK-kafka-consumer-mysql-1 kafka消費者消費MySQL隊列信息
EFK-kafka-consumer-mysql-2 kafka消費者消費MySQL隊列信息
EFK-kafka-consumer-nginx-1 kafka消費者消費nginx隊列信息
EFK-kafka-consumer-nginx-2 kafka消費者消費nginx隊列信息
EFK-kafka-consumer-php-1 kafka消費者消費php隊列信息
EFK-kafka-consumer-php-2 kafka消費者消費php隊列信息
EFK-kafka-consumer-php-1 kafka消費者消費php隊列信息
EFK-kafka-consumer-php-2 kafka消費者消費php隊列信息
EFK-kafka-consumer-system-1 kafka消費者消費system隊列信息
EFK-kafka-consumer-system-2 kafka消費者消費system隊列信息
EFK-kafka-manager kafka的Web工具
EFK-kafka-zookeeper kafka必須連接zookeeper
EFK-kibana kibana 進程
EFK-logstash-indexer-mysql logstash轉發mysql消息
EFK-logstash-indexer-system logstash轉發system消息
EFK-logstash-indexer-php logstash轉發php消息
EFK-logstash-indexer-nginx logstash轉發nginx消息

后期完成結果如下:


2.EFK 日志收集系統之Supervisord啟動elasticsearch-head

由於在前文中0x04 安裝 Elasticsearch,我們是在前台使用直接運行 npm run start 命令啟動的 elasticsearch-head,這樣有一點不好,開啟此前台應用程序的中端不能關閉。比如可以用 nohup& 讓程序在后台運行。但我們還是更偏愛於使用Web界面管理的后台程序,更加直觀明了。因此我們使用 supervisord 管理后台程序,話不多說,直接切入主題。

如果要使用 supervisord 管理后台進程,需要在 /etc/supervisord.d 目錄下,配置進程的相關信息,創建配置文件elasticsearch-head.ini ,我這邊的配置如下:

# 配置進程的名字
[program:EFK-elasticsearch-head]

# 命令運行的目錄
directory=/usr/local/elasticsearch-head

# 進程執行的命令
command=/usr/local/node-v10.15.1-linux-x64/bin/npm run start
# 是否隨supervisor啟動
autostart=true

# 子進程啟動多少秒之后,此時狀態如果是running,則我們認為啟動成功了。默認值為1
startsecs=1

# 標准錯誤日志目錄
stderr_logfile=/var/log/supervisor/elasticsearch-head_stderr.log
# 標准輸出日志目錄
stdout_logfile=/var/log/supervisor/elasticsearch-head_stdout.log

# 進程啟動的用戶
user=elasticsearch

# 把 stderr 重定向到 stdout,默認 false,即把標准錯誤日志寫入到標准輸出日志中,說白了就是將兩個日志文件合並到一個日志文件
redirect_stderr=true

# 標准輸出 stdout 日志文件大小 20MB,默認為50MB
stdout_logfile_maxbytes=20MB

# 標准輸出日志備份數量
stdout_logfile_backups=10

elasticsearch-head.ini 配置文件創建完畢后,可以在命令行中,運行命令:

# supervisorctl update

這條命令會加載 /etc/supervisord.d 目錄下的進程文件:

  • 如果有新的進程文件,則會直接啟動該進程;
  • 如果僅僅是舊的進程文件被更新了,則會停止該進程,再重新啟動該進程。
# supervisorctl status
EFK-elasticsearch-head           RUNNING   pid 4107, uptime 8:23:41

supervisord 還有如下幾條命令比較常用:

# 停止某一個進程
supervisorctl stop 進程名字
# 啟動某一個進程
supervisorctl start 進程名字
# 去除某一個進程
supervisorctl remove 進程名字

對於一個新加入的進程,我們需要先執行 supervisorctl update 后,才能在 supervosprctl status 中看到進程的狀態。

如果在命令終端觀察到進程為RUNNING狀態,則可以瀏覽器管理該進程了,瀏覽器輸入:http://192.168.7.3:9001,如下圖,選擇 Tail -f,則可以看到/var/log/supervisor/elasticsearch-head_stdout.log日志中的內容,是不是很方便呢?

最后,我們要查看elasticsearch-head的端口是否起來:

# netstat -anlp | grep 9100
tcp        0      0 0.0.0.0:9100            0.0.0.0:*               LISTEN      7112/grunt 
# lsof -i :9100
COMMAND  PID          USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
grunt   7112 elasticsearch   18u  IPv4 8031920      0t0  TCP *:jetdirect (LISTEN)

瀏覽器訪問 :http://192.168.7.3:9100


3.EFK 日志收集系統之Kafka配置

0x01 Kafka簡單介紹

關於Kafka的具體介紹,可參考中文文檔:Kafka中文網站,以下摘抄於文檔中一些比較重要的概念。

Kafka是一種高性能、低延遲、具備日志存儲、備份和傳播功能的分布式文件系統。有如下三種特性:

  • 可以讓你發布和訂閱流式的記錄
  • 可以儲存流式的記錄,並且有較好的容錯性。
  • 可以在流式記錄產生時就進行處理。

Kafka 可用於兩大類別的應用:

  • 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue)
  • 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)

Kafka 基本概念:

  • Kafka作為一個集群,運行在一台或多台服務器上
  • Kafka 通過 topic 對存儲的流數據進行分類
  • 每條記錄中包含一個 key、一個 value和一個timestamp(時間戳)
  • Topic 就是數據主題,是數據記錄發布的地方,可以用來區分業務系統。Kafka中的 Topic 總是多訂閱者模式,一個 topic 可以區分擁有一個一個或者多個消費者來訂閱它的數據
  • Kafka 集群保留所有發布的記錄—無論他們是否已被消費—並通過一個可配置的參數——保留期限來控制。舉個例子, 如果保留策略設置為2天,一條記錄發布后兩天內,可以隨時被消費,兩天過后這條記錄會被拋棄並釋放磁盤空間。
  • 日志中的 partition(分區)有以下幾個用途。第一,當日志大小超過了單台服務器的限制,允許日志進行擴展。每個單獨的分區都必須受限於主機的文件限制,不過一個主題可能有多個分區,因此可以處理無限量的數據。第二,可以作為並行的單元集。
  • 日志的分區partition (分布)在Kafka集群的服務器上。每個服務器在處理數據和請求時,共享這些分區。每一個分區都會在已配置的服務器上進行備份,確保容錯性。
  • 每個分區都有一台 server 作為 “leader”,零台或者多台server作為 follwers 。leader server 處理一切對 partition (分區)的讀寫請求,而follwers只需被動的同步leader上的數據。當leader宕機了,followers 中的一台服務器會自動成為新的 leader。每台 server 都會成為某些分區的 leader 和某些分區的 follower,因此集群的負載是平衡的。
  • 生產者可以將數據發布到所選擇的topic(主題)中。生產者負責將記錄分配到topic的哪一個 partition(分區)中。
  • 消費者使用一個 消費組 名稱來進行標識,發布到topic中的每條記錄被分配給訂閱消費組中的一個消費者實例.消費者實例可以分布在多個進程中或者多個機器上。如果所有的消費者實例在同一消費組中,消息記錄會負載平衡到每一個消費者實例。如果所有的消費者實例在不同的消費組中,每條消息記錄會廣播到所有的消費者進程.。
  • 通常情況下,每個 topic 都會有一些消費組,一個消費組對應一個"邏輯訂閱者"。一個消費組由許多消費者實例組成,便於擴展和容錯。

根據EFK日志采集系統的特點,我個人對使用kafka的理解,即怎樣解決日志在Kafka中做分類傳輸的問題:由於我們利用 Filebeat 采集到的日志會有很多不同的日志,比較常見的例如 Web服務器的Nginx的日志、php的日志,數據庫服務器MySQL的日志、慢查詢日志等,還有每一台服務器的系統日志。
所以,我們需要對不同的日志做分類傳到Kafka的中。在Filebeat 中,我們利用 fields 添加一個自定義字段,這里我們會定義一個keylog_topic,值根據采集的日志來定,例如我們采集的是是nginx的錯誤日志和訪問日志 /var/log/nginx/error.log/var/log/nginx/access.log ,則可以將 log_topic 的值定義為 nginx,然后在 filebeat中配置輸出為 output.kafka,時,可以定義 topic%{[fields.log_topic]},這樣就可以實現,將不同的日志分類存儲到 kafka 中。Kafka中比較重要的一個概念Topic,即數據主題。我們既然采集到了日志,並傳遞到Kafka中,在Kafka中也不能夠隨便存儲。所以我們根據 filebeat 中自定義的log_topic字段,來定義Topic。為了做橫向擴展,一個 topic 可以由多個 partition 組成,遇到瓶頸時,通過增加partition的數量來進行橫向擴容。但是對於每一個partition,需要啟用一個消費者,消費partition中的數據。即如下圖所示:

在上圖中,我們采集了systemphpmysqlnginx日志,Filebeat 將日志推送到 Kafka中,並根據不同的日志存儲在不同的 Topic 中。在Kafka中會啟動3個broker實例,分別為 broker0broker1broker2。每個Topic分為兩個partition,並且一個Topic對應一組消費者組,這組消費者組中每一個消費者對應消費一個partition中的消息隊列。話不多說,開始實戰!

0x01 Kafka 配置之啟動 broker

EFK之日志收集系統部署中,我們已將Kafka的安裝包解壓至 /usr/local 目錄下,進入該目錄下,查看有哪些文件內容:

# pwd
/usr/local/kafka_2.11-2.1.0
# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
# ls bin/
connect-distributed.sh        kafka-console-producer.sh    kafka-log-dirs.sh                    kafka-run-class.sh       kafka-streams-application-reset.sh  zookeeper-security-migration.sh
connect-standalone.sh         kafka-consumer-groups.sh     kafka-mirror-maker.sh                kafka-topics.sh                     zookeeper-server-start.sh
kafka-acls.sh                 kafka-consumer-perf-test.sh  kafka-preferred-replica-election.sh  kafka-verifiable-consumer.sh        zookeeper-server-stop.sh
kafka-broker-api-versions.sh  kafka-delegation-tokens.sh   kafka-producer-perf-test.sh          kafka-verifiable-producer.sh        zookeeper-shell.sh
kafka-configs.sh              kafka-delete-records.sh      kafka-reassign-partitions.sh         kafka-server-start.sh    trogdor.sh
kafka-console-consumer.sh     kafka-dump-log.sh            kafka-replica-verification.sh        kafka-server-stop.sh     windows

如上,在 /usr/local/kafka_2.11-2.1.0/bin 目錄下,可以看到有很多以.sh結尾的 Shell 腳本,重點關注以下幾個腳本

zookeeper-server-start.sh                   # zookeeper服務啟動腳本
kafka-topics.sh                                    # topic管理腳本,可創建、刪除、修改topic
kafka-console-consumer.sh                # 消費者啟動腳本
kafka-server-start.sh                            # broker 實例啟動腳本

Kafka要運行,首先要啟動 zookeeperzookeeper是一個為分布式應用提供一致性服務的軟件

1.啟動 ZooKeeper 服務器

利用 /usr/local/kafka_2.11-2.1.0/bin下的zookeeper-server-start.sh腳本啟動 ZooKeeper(當然我們肯定會有supervisord部署的)

# bin/zookeeper-server-start.sh config/zookeeper.properties

啟動服務后,可以查看 zookeeper2181 端口是否啟動

# netstat -tnlp | grep 2181
tcp6       0      0 :::2181                 :::*                    LISTEN      3850/java 
# 查看連接數
# lsof -i:2181
  1. supervisord 方式部署 zookpeeper

/etc/supervisord.d 目錄下,新建 kafka-zookeeper.ini 配置文件,填入如下內容:

[program:EFK-kafka-zookeeper]

directory=/usr/local/kafka_2.11-2.1.0/bin

command=/usr/local/kafka_2.11-2.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

autostart=true
startsecs=1

stderr_logfile=/var/log/supervisor/kafka-zookeeper_stderr.log
stdout_logfile=/var/log/supervisor/kafka-zookeeper_stdout.log

user=root
redirect_stderr=true

stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10

然后執行在命令行中執行:

# supervisorctl update
# supervisorctl status

supervisord 方式部署完畢。

0x03 啟動 broker 實例

腳本方式啟動

# bin/kafka-server-start.sh config/server.properties

這里有一點要注意,我們在開啟 broker 實例時,一般都不止一個broker,為了高可用,可定要多啟動幾個broker。而每個 broker 可能配置需求不一樣,所有會多次用到server.properties配置文件,我們可以根據 broker的id來命令server.properties文件,在broker中,broker的ID必須是唯一的。在server.properties中配置。先講講server.properties配置文件:

# broker 的ID,必須為整數,且在集群中唯一
broker.id=0
# broker的機架位置。 這將在機架感知副本分配中用於容錯。
broker.rack=efk
# broker監聽的端口,如果在同一台服務器上啟動多個broker,應啟用不同的端口
listeners=PLAINTEXT://192.168.7.3:9092
# 服務器用於從接收網絡請求並發送網絡響應的線程數
num.network.threads=3
# 服務器用於處理請求的線程數,可能包括磁盤I/O
num.io.threads=8
# 服務端用來處理socket連接的SO_SNDBUF緩沖大小。
socket.send.buffer.bytes=102400
# 服務端用來處理socket連接的SO_RCVBUFF緩沖大小。
socket.receive.buffer.bytes=102400
# socket請求的最大大小,這是為了防止server跑光內存,不能大於Java堆的大小。
socket.request.max.bytes=104857600
# 保存日志數據的目錄,如果未設置將使用log.dir的配置。可根據broker的id號配置
log.dirs=/tmp/kafka-logs-0
# 每個topic的默認日志分區數
num.partitions=1
# 每個數據目錄,用於啟動時日志恢復和關閉時刷新的線程數
num.recovery.threads.per.data.dir=1
# offset topic的副本數(設置的越大,可用性越高)。
offsets.topic.replication.factor=1
# 事務topic的副本數(設置的越大,可用性越高)。
transaction.state.log.replication.factor=1
# 覆蓋事務topic的min.insync.replicas配置
transaction.state.log.min.isr=1
# 日志刪除的時間閾值(小時為單位)
log.retention.hours=168
# 單個日志段文件最大大小
log.segment.bytes=1073741824
# 日志清理器檢查是否有日志符合刪除的頻率(以毫秒為單位)
log.retention.check.interval.ms=300000
# 連接到 zookeeper 的IP和端口號
zookeeper.connect=192.168.7.3:2181
# 與ZK server建立連接的超時時間,
zookeeper.connection.timeout.ms=6000
# 在執行第一次重新平衡之前,group協調器將等待更多consumer加入group的時間。
group.initial.rebalance.delay.ms=0

server.properties 的配置文件中,最核心的三個配置為:

  • broker.id
  • log.dirs
  • zookeeper.connect

請確保 broker.id 的唯一性;log.dirs的配置標准:kafka-logs-id號,例如:broker.id0 ,則 log.dirs 可設置為 kafka-logs-0,依次類推。這里我們需要創建3個broker,依次配置 3brokersupervisord 進程文件,參考 broker.id=0 的配置即可。

注意:broker的配置文件中還有一個特別重要的參數 max.message.bytes默認為 1000012,即 1M,生產者發送給broker消息的最大字節數。默認值是1000012,也就是1MB。生產者發送的消息超過該設置,會被broker拒絕接收,並且會收到broker的錯誤報告。

即在 Filebeat 中會收到如下報錯信息:

2019-03-05T09:39:33.611+0800    ERROR   kafka/client.go:131     Dropping event: no topic could be selected
2019-03-05T09:39:33.611+0800    ERROR   kafka/client.go:131     Dropping event: no topic could be selected
2019-03-05T09:39:33.611+0800    ERROR   kafka/client.go:131     Dropping event: no topic could be selected

所以解決這個問題有兩種辦法:

  • 第一:日志標准化,每個日志文件的大小定義不超過1M
  • 第二:增大``max.message.bytes`的數值

0x04 啟動 comsumer 實例

Kafka中的消費者會主動到broker中拉取指定topic中指定patition中的消息。

如果用腳本啟動消費者,如下:

# bin/kafka-console-consumer.sh --bootstrap-server 192.168..7.3:9092,192.168..7.3:9093,192.168..7.3:9094 --topic nginx --from-beginning
  • bootstrap-server:為我們啟動的broker實例,多個實例用 ,隔開
  • topic:消息主題
  • from-beginning:從消息第一條消息開始消費

當然我們會更熱衷於用 supervisord 啟動消費者。此處只舉一個例子,后續的消費者參考此配置即可。
在前面創建topic時,我們的配置文件中 server.properties 配置的參數 num.partitions=1,所以,每當我們自動創建一個topic的同時,也會為這個topic創建一個 partition。因此,對於每一個partition均要分配一個消費者。就拿一個消費system日志的消費者舉例。
/etc/supervisord.d 中創建配置文件 kafka-consumer-system-1.ini,文件中寫入如下配置:

[program:EFK-kafka-consumer-system-1]

directory=/usr/local/kafka_2.11-2.1.0/bin/

# bootstrap-server 指定broker實例
# topic 指定消費主題
# group 指定消費者歸屬於哪個消費者組,不用提前創建組名,直接指定組名,即創建消費者組
# partition 指定消費者消費 topic 中的哪個partition
# consumer.config 指定消費者的配置文件,每一個消費者一個單獨的配置文件
command=/usr/local/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094 --topic system --group system --partition 0 --consumer.config /usr/local/kafka_2.11-2.1.0/config/consumer-system-1.properties

autostart=true

startsecs=1

stderr_logfile=/var/log/supervisor/kafka-consumer-system-1_stderr.log
stdout_logfile=/var/log/supervisor/kafka-consumer-system-1_stdout.log

user=root
redirect_stderr=true

stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10

接下來聊聊消費者的配置文件,默認放置於/usr/local/kafka_2.11-2.1.0/config目錄下的consumer.properties,為了實現標准化,對每一個消費者的配置文件命名規范,做一個限制:consumer-topic名字-第幾個消費者

bootstrap.servers=192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094
group.id=system
# 自動將偏移重置為最新偏移
auto.offset.reset=latest
# 會話超時時間
session.timeout.ms=15000
# 消費者在獲取更多記錄之前可以閑置的時間量
max.poll.interval.ms=60000
# 一次調用poll()時返回的最大記錄數。
max.poll.records=400
# 發出請求時傳遞給服務器的id字符串
client.id=system1
# 心跳間隔時間
heartbeat.interval.ms=5000

啟動消費者

# supervisorctl update

supervisord 的前端頁面,點擊 EFK-kafka-consumer-system-1中的 Tail -f,查看進程啟動的日志,當然,我們還沒有啟動生產者(Filebeat),此時當然沒有消息可以被消費啦!


4.EFK 架構部署之 Logstash 配置

0x01 Logstash 介紹

Logstash 由三個插件組成:

  • INPUTS:輸入端
  • FILTERS:過濾端
  • OUTPUTS:輸出端

如下圖,信息源通過 INPUT插件傳入 Logstash,然后我們可以根據我們自己的需求,通過 Filter 插件將過濾出我們想要的日志,例如 message中包含 error的日志,則可以在這里配置。過濾完后,再輸入到 OUTPUT插件中,選擇輸出的目的地。

一個 Logstash的 pipeline至少需要 INPUTOUTPUT

0x02 配置 log4j2.properties

rmp包安裝的Logstash主要的文件目錄有兩個:

  • /usr/share/logstash:logstash的安裝目錄
  • /etc/logstash/:logstash的主配置文件目錄

/usr/share/logstash 下,需要創建一個 config 目錄:

# mkdir -p /usr/share/logstash/config
# cp /etc/logstash/log4j2.properties /usr/share/logstash/config
# chown -R logstash:logstash /usr/share/logstash

如果此步驟沒有執行,在后面啟動 logstash 時,會有提示/usr/share/logstash/config目錄下,缺少log4j2.properties文件。

0x02 supervisord 方式部署

我們只需要配置INPUTOUTPUTFILTER根據實際需求,匹配過濾。

/etc/logstash/conf.d下創建 indexer-system.conf 文件,配置如下內容:

input {
    
    kafka {
        bootstrap_servers => "192.168.7.3:9092,192.168.7.3:9093,192.168.7.3:9094"
        group_id => "system"
        topics => "system"
        auto_offset_reset => "latest"
        decorate_events => true
        codec => "json"
    }

}

output {   
    elasticsearch {
        hosts => ["192.168.7.3:9200"]
        # 根據kafka 中 topic 的名字創建索引,並以天為單位
        index =>  "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}" 
    }
}

如果仔細的話,發現我們只接受了 kafkatopicsystem 的日志,並在ES中給system類型的日志創建了索引;而且我們給文件取的名字 indexer-system.conf也包含一個 system 的名字。
因此,我們需要給每一個 topic 創建一個 indexer文件,並配置不同的 topicgroup_id

# pwd
/etc/logstash/conf.d
# ls
indexer-mysql.conf  indexer-nginx.conf  indexer-php.conf  indexer-system.conf

如上,我給 topicmysqlnginxphpsystem分別創建了一個 indexer 文件

然后,我們用 supervisord 啟動 indexer-system.conf

/etc/supervisord.d 目錄下,創建 logstash-indexer-system.ini的配置文件,輸入一下內容

[program:EFK-logstash-indexer-system]

directory=/etc/logstash

# 啟動logstash時,指定indexer-system.conf配置文件,指定數據存儲目錄indexer-system
command=/usr/bin/logstash -f /etc/logstash/conf.d/indexer-system.conf --path.data=/tmp/indexer-system

startsecs=1

stderr_logfile=/var/log/supervisor/logstash-indexer-system_stderr.log
stdout_logfile=/var/log/supervisor/logstash-indexer-system_stdout.log

user=logstash
redirect_stderr=true

stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10

啟動 logstash

# supervisorctl update

配置完畢!

注意:對於每一個logstashindexer,我們都需要指定一個 indexer的配置文件;並配置一個數據存放的目錄,不需要提前創建該目錄,但是,不同的 indexer一定要存放在不同的目錄。


5.EFK 架構部署之supervisord方式啟動Kibana

由於kibana的啟動方式直接運行/usr/local/kibana-6.6.0-linux-x86_64/bin下的 kibana即可,不過多解釋,直接supervisord部署。

/etc/supervisord.d 目錄下,創建 kibana.ini 配置文件:

[program:EFK-kibana]

directory=/usr/local/kibana-6.6.0-linux-x86_64

command=/usr/local/kibana-6.6.0-linux-x86_64/bin/kibana

startsecs=1

stderr_logfile=/var/log/supervisor/kibana_stderr.log
stdout_logfile=/var/log/supervisor/kibana_stdout.log

user=root
redirect_stderr=true

stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10

啟動 kibana

#  supervisorctl update

瀏覽器訪問:http://http😕/192.168.7.3:5601


6.EFK 日志收集系統之Filebeat配置

Filebeat 是我們的日志采集端,即每台機器上都需要安裝 Filebeat,並配置不同的日志路徑。

在 Filebeat 的主配置文件 /etc/filebeat/filebeat.yml 下配置如下:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/boot.log
    - /var/log/messages
    - /var/log/secure
  fields:
    log_topic: system
    server_ip: "192.168.7.3"
- type: log
  enabled: true
  paths:
    - /var/log/mysqld.log
  fields:
    log_topic: mysql
    server_ip: "192.168.7.3"

output.kafka:
  enabled: true
  hosts: ["192.168.7.3:9092","192.168.7.3:9093","192.168.7.3:9094"]
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false
  codec.json:
    pretty: false
    escape_html: true
  metadata:
    retry.max: 3
    refresh_frequency: 10m
  worker: 1
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 0
  required_acks: 1
  compression: gzip
  compression_level: 4
  max_message_bytes: 1000000
  client_id: beats


close_older: 30m
force_close_files: true

#沒有新日志采集后多長時間關閉文件句柄,默認5分鍾,設置成1分鍾,加快文件句柄關閉;
close_inactive: 1m

#傳輸了3h后荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point;
close_timeout: 3h

##這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是采集過的文件描述在registry文件里永不清理,在運行一段時間后,registry會變大,可能會帶來問題。
clean_inactive: 72h

#設置了clean_inactive后就需要設置ignore_older,且要保證ignore_older < clean_inactive
ignore_older: 70h

# 限制 CPU和內存資源
max_procs: 1 
queue.mem.events: 256
queue.mem.flush.min_events: 128

Filebeat默認是將采集的日志輸入到ES中的,所以要注釋掉/etc/filebeat/filebeat.yml中的 output.elasticsearch,在 Filebeat 中只允許有一個 output

啟動 Filebeat

# systemctl start filebeat
# systemctl enable filebeat

啟動 Filebeat 后,我們可以通過如下命令查看 Filebeat 的日志:

# tail -f /var/log/filebeat/filebeat

配置完畢!


參考文章

Supervisor
進程管理supervisor的簡單說明
Kafka中文網站


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM