
早在傳統的單體應用時代,查看日志大都通過SSH客戶端登服務器去看,使用較多的命令就是 less 或者 tail。如果服務部署了好幾台,就要分別登錄到這幾台機器上看,等到了分布式和微服務架構流行時代,一個從APP或H5發起的請求除了需要登陸服務器去排查日志,往往還會經過MQ和RPC調用遠程到了別的主機繼續處理,開發人員定位問題可能還需要根據TraceID或者業務唯一主鍵去跟蹤服務的鏈路日志,基於傳統SSH方式登陸主機查看日志的方式就像圖中排查線路的工人一樣困難,線上服務器幾十上百之多,出了問題難以快速響應,因此需要高效、實時的日志存儲和檢索平台,ELK就提供這樣一套解決方案。

一、ELK架構的分類
ELK是一套應用組件,由Elasticsearch、Logstash和Kibana三部分組件組成,簡稱ELK;它是一套開源免費、功能強大的日志分析管理系統。ELK可以將我們的系統日志、網站日志、應用系統日志等各種日志進行收集、過濾、清洗,然后進行集中存放並可用於實時檢索、分析。
- Elasticsearch :分布式搜索引擎。具有高可伸縮、高可靠、易管理等特點。可以用於全文檢索、結構化檢索和分析,並能將這三者結合起來。Elasticsearch 基於 Lucene 開發,現在使用最廣的開源搜索引擎之一,Wikipedia 、StackOverflow、Github 等都基於它來構建自己的搜索引擎。
- Logstash :數據收集處理引擎。支持動態的從各種數據源搜集數據,並對數據進行過濾、分析、豐富、統一格式等操作,然后存儲以供后續使用。
- Kibana :可視化化平台。它能夠搜索、展示存儲在 Elasticsearch 中索引數據。使用它可以很方便的用圖表、表格、地圖展示和分析數據。
1.最簡單的ELK架構
此架構主要是將Logstash部署在各個節點上搜集相關日志、數據,並經過分析、過濾后發送給遠端服務器上的Elasticsearch進行存儲。Elasticsearch再將數據以分片的形式壓縮存儲,並提供多種API供用戶查詢、操作。用戶可以通過Kibana Web直觀的對日志進行查詢,並根據需求生成數據報表。

此架構的優點是搭建簡單,易於上手。缺點是Logstash消耗系統資源比較大,運行時占用CPU和內存資源較高。另外,由於沒有消息隊列緩存,可能存在數據丟失的風險,適合於數據量小的環境使用。
2.引入Kafka的典型ELK架構
為保證日志傳輸數據的可靠性和穩定性,引入Kafka作為消息緩沖隊列,位於各個節點上的Logstash Agent(一級Logstash,主要用來傳輸數據)先將數據傳遞給消息隊列,接着,Logstash server(二級Logstash,主要用來拉取消息隊列數據,過濾並分析數據)將格式化的數據傳遞給Elasticsearch進行存儲。最后,由Kibana將日志和數據呈現給用戶。由於引入了Kafka緩沖機制,即使遠端Logstash server因故障停止運行,數據也不會丟失,可靠性得到了大大的提升。

該架構優點在於引入了消息隊列機制,提升日志數據的可靠性,但依然存在Logstash占用系統資源過多的問題,在海量數據應用場景下,可能會出現性能瓶頸。
3.FileBeats+Kafka+ELK集群架構
該架構從上面架構基礎上改進而來的,主要是將前端收集數據的Logstash Agent換成了filebeat,消息隊列使用了kafka集群,然后將Logstash和Elasticsearch都通過集群模式進行構建,完整架構如圖所示:

日志采集器Logstash其功能雖然強大,但是它依賴java、在數據量大的時候,Logstash進程會消耗過多的系統資源,這將嚴重影響業務系統的性能,而filebeat就是一個完美的替代者,它基於Go語言沒有任何依賴,配置文件簡單,格式明了,同時filebeat比logstash更加輕量級,所以占用系統資源極少,非常適合安裝在生產機器上。這就是推薦使用filebeat,也是 ELK Stack 在 Agent 的第一選擇。
此架構適合大型集群、海量數據的業務場景,它通過將前端Logstash Agent替換成filebeat,有效降低了收集日志對業務系統資源的消耗。同時,消息隊列使用kafka集群架構,有效保障了收集數據的安全性和穩定性,而后端Logstash和Elasticsearch均采用集群模式搭建,從整體上提高了ELK系統的高效性、擴展性和吞吐量。我所在的項目組采用的就是這套架構,由於生產所需的配置較高,且涉及較多持久化操作,采用的都是性能高配的雲主機搭建方式而非時下流行的容器搭建。
source: ELK應用架構介紹
二、FileBeat服務搭建
日志采集器選擇了Filebeat而不是Logstash,是由於 Logstash 是跑在 JVM 上面,資源消耗比較大,后來作者用 GO 寫了一個功能較少但是資源消耗也小的輕量級的 Agent 叫 Logstash-forwarder,后來改名為FileBeat。關於ELK中Filebeat的原理介紹和詳細配置我另外寫了一篇文章,戳它:《FileBeat原理與實踐指南》。
1.filebeat.yml配置
最核心的部分在於FileBeat配置文件的配置,需要指定paths(日志文件路徑),fileds(日志主題),hosts(kafka主機ip和端口),topic(kafka主題),version(kafka的版本),drop_fields(舍棄不必要的字段),name(本機IP)
filebeat.inputs:
- type: log
enabled: true
paths:
- /wls/applogs/rtlog/app.log
fields:
log_topic: appName
multiline:
# pattern for error log, if start with space or cause by
pattern: '^[[:space:]]+(at|.{3})|^Caused by:'
negate: false
match: after
output.kafka:
enabled: true
hosts: ["kafka-1:9092","kafka-2:9092"]
topic: applog
version: "0.10.2.0"
compression: gzip
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset"]
logging.level: error
name: app-server-ip
2.常用運維指令
- 終端啟動(退出終端或ctrl+c會退出運行)
./filebeat -e -c filebeat.yml
- 以后台守護進程啟動啟動filebeats
nohup ./filebeat -e -c filebeat.yml &
- 確認配置不再修改,可用如下命令
//可以防止日志爆盤,將所有標准輸出及標准錯誤輸出到/dev/null空設備,即沒有任何輸出信息。
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
- 停止運行FileBeat進程
ps -ef | grep filebeat
Kill -9 線程號
3.FileBeat調試
當FileBeat在服務主機采集應用日志並向Kafka輸出日志時可以通過兩個步驟驗證Filebeat的采集輸送是否正常:
- 采集驗證:終端執行命令,查看控制台輸出,如果服務有異常會直接打印出來並自動停止服務。
./filebeat -e -c filebeat.yml
- 接收驗證:Kafka集群控制台直接消費消息,驗證接收到的日志信息。
./kafka-console-consumer.sh --zookeeper zk-1:2181,zk-2:2181 --topic app.log
- ElasticSearch或者Kibana驗證。如果已經搭建了ELK平台,可根據上傳的日志關鍵屬性,於KB或者ES平台查看是否有日志流輸入或者在search框中根據host.name/log_topic關鍵屬性來查看是否有落庫。
source: FileBeat原理與實踐指南
三、Kafka集群搭建
一個典型的Kafka集群包含若干Producer,若干broker、若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。關於Kafka的原理和使用可以參考《Kafka學習筆記》,下面就提供了一個典型的Kafka+ZooKeeper集群:

1.Kafka配置
生產環境中 Kafka 集群中節點數量建議為(2N + 1 )個,Zookeeper集群同樣建議為(2N+1)個,這邊就都以 3 個節點舉例,修改kafka集群的配置文件,以broker1為例進行配置:
$ vim ./config/server.properties
broker.id=1
port=9092
host.name=192.168.0.1
num.replica.fetchers=1
log.dirs=/opt/kafka_logs
num.partitions=3
zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
num.network.threads=8
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
delete.topic.enable=true
這里比較重要的一個參數配置就是:num.partitions
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。推薦partition的數量一定要大於同時運行的consumer的數量。另外,建議partition的數量大於集群broker的數量,這樣消息數據就可以均勻的分布在各個broker中。
-delete.topic.enable:在0.8.2版本之后,Kafka提供了刪除topic的功能,但是默認並不會直接將topic數據物理刪除。如果要從物理上刪除(即刪除topic后,數據文件也會一同刪除),就需要設置此配置項為true。
2.Kafka運維命令
這里涉及到topic主題的創建、與filebeats調試消息的狀態,需要掌握幾個有用的運維指令:
- 查看topic狀態
- ./kafka-topics.sh --describe --zookeeper zk-1:2181,zk-2:2181,zk-3:2181 --topic app.log
- 查看所有topic列表:
- sh kafka-topics.sh --zookeeper --zookeeper zk-1:2181,zk-2:2181,zk-3:2181 --list
- 創建topic
- sh kafka-topics.sh --zookeeper --zookeeper zk-1:2181,zk-2:2181,zk-3:2181 --create --topic app.log --partitions 5 --replication-factor 2
- 注意:server.properties 設置 delete.topic.enable=true
- 刪除主題數據
- ./bin/kafka-topics.sh --delete --zookeeper zk-1:2181,zk-2:2181,zk-3:2181 --topic app.log
- 生產topic的消息
- ./kafka-console-producer.sh --broker-list kafka-1:9092 kafka-2:9092 --topic app.log
- 消費topic的消息
- ./kafka-console-consumer.sh --zookeeper zk-1:2181,zk-2:2181,zk-3:2181 --topic app.log
3.Kafka服務監控
通過以下命令啟動了Kafka集群服務以后,嘗試創建主題、打印主題列表查看服務狀態。
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
同時也可以登錄Kafka集群中任意兩台broker主機,分別用統一的主題進行消息的生產與消費,如果服務正常則消息兩端可以互通:


四、LogStash
Logstash是一個開源的、服務端的數據處理pipeline(管道),它可以接收多個源的數據、然后對它們進行轉換、最終將它們發送到指定類型的目的地。Logstash是通過插件機制實現各種功能的,可以在https://github.com/logstash-plugins 下載各種功能的插件,也可以自行編寫插件。

Logstash的數據處理過程主要包括:Inputs, Filters, Outputs 三部分, 另外在Inputs和Outputs中可以使用Codecs對數據格式進行處理。這四個部分均以插件形式存在,在logstash.conf配置文件中設置需要使用的input,filter,output, codec插件,以實現特定的數據采集,數據處理,數據輸出等功能
- Inputs:用於從數據源獲取數據,常見的插件如file, syslog, redis, beats 等[詳細參考]
- Filters:用於處理數據如格式轉換,數據派生等,常見的插件如grok, mutate, drop, clone, geoip等[詳細參考]
- Outputs:用於數據輸出,常見的插件如elastcisearch,file, graphite, statsd等[詳細參考]
- Codecs:Codecs不是一個單獨的流程,而是在輸入和輸出等插件中用於數據轉換的模塊,用於對數據進行編碼處理,常見的插件如json,multiline
本實例中input從kafka中獲取日志數據,filter主要采用grok、date插件,outputs則直接輸出到elastic集群中。logstash的配置文件是可以自定義的,在啟動應用時需要制定相應的配置文件。
$ vim logstash.conf
input {
kafka {
type => "kafka"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topics => "app.log"
consumer_threads => 2
codec => "json"
}
}
filter {
grok {
match => [
#涉及多個正則匹配的寫法
"message","%{HTTPDATE:timestamp}",
"message","%{COMBINEDAPACHELOG}"
]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
elasticsearch {
host => ["es-1:9300","es-2:9300","es-3:9300"]
index => "applogs-%{+YYYY.MM.dd}"
}
}
對上述參數進行說明:
input,需要指明是kafka來源,broker的ip和端口,主題,codec模式為json(因為經過filebeat采集而來的數據都json化了)
filter,grok是一個十分強大的logstash filter插件,通過正則解析任意文本,將非結構化日志數據弄成結構化和方便查詢的結構。
output,指定了輸出到ES集群,host這里寫ES集群的客戶端節點即可,index則是對應ES里的檢索,一般以【topic+日期】即可。
但是往往復雜的日志系統這些還是不夠,需要加一些特殊處理如:異常堆棧需要合並行、控制台調試等。
- 搜集日志時涉及異常堆棧的合並行處理時,可以加上;如果Filebeat已作合並處理此處則不需要了:
input {
stdin {
codec => multiline {
pattern => "^["
negate => true
what => "previous"
}
}
}
- 控制台調試過濾器。
- 很多時候我們需要調試自己的正則表達式是否可用,官方的在線調試並不好用,那么可以通過自己生成的json數據來校驗正則的效果,count指定重復生成的次數,message則是待調試的內容:
input {
generator {
count => 1
message => '{"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}'
codec => json
}
}
rubydebug指明了輸出內容到控制台:
output {
stdout {
codec => rubydebug
}
}
filter插件由用戶自定義填寫,啟動測試並檢查接口,每次調試都要啟動一次服務可能會需要等待幾秒鍾才輸出內容到控制台。
./logstash -f /wls/logstash/config/logstash-test.conf
關於LogStash的語法詳解和實踐指南可以參考:《Logstash 最佳實踐》
五、Elastic集群搭建
在ElasticSearch的架構中,有三類角色,分別是Client Node、Data Node和Master Node,搜索查詢的請求一般是經過Client Node來向Data Node獲取數據,而索引查詢首先請求Master Node節點,然后Master Node將請求分配到多個Data Node節點完成一次索引查詢。

Master Node:主要用於元數據(metadata)的處理,比如索引的新增、刪除、分片分配等,以及管理集群各個節點的狀態。由於數據的存儲和查詢都不會走主節點,所以主節點的壓力相對較小,因此主節點的內存分配也可以相對少些,但是主節點卻是最重要的,因為一旦主節點宕機,整個elasticsearch集群將不可用。所以一定要保證主節點的穩定性。
Data Node:數據節點,這些節點上保存了數據分片。它負責數據相關操作,比如分片的CRUD、搜索和整合等操作。數據節點上面執行的操作都比較消耗CPU、內存和I/O資源,數據節點服務器要選擇較好的硬件配置。
Client Node:客戶端節點。client node存在的好處是可以分擔data node的一部分壓力,因為elasticsearch的查詢是兩層匯聚的結果,第一層是在data node上做查詢結果匯聚,然后把結果發給client node,client node接收到data node發來的結果后再做第二次的匯聚,然后把最終的查詢結果返回給用戶。這樣,client node就替data node分擔了部分壓力。
1.集群配置
第一步即定義修改es集群的配置文件:
$ vim config/elasticsearch.yml
cluster.name: es
node.name: es-node1
node.master: true
node.data: true
network.host: 192.168.0.1
discovery.zen.ping.unicast.hosts: ["192.168.0.2","192.168.0.3"]
discovery.zen.minimum_master_nodes: 2
集群重要配置項
- node.name 可以配置每個節點的名稱
- node.master 可以配置該節點是否有資格成為主節點。如果配置為 true,則主機有資格成為主節點,配置為 false 則主機就不會成為主節點,可以去當數據節點或負載均衡節點
- node.data 可以配置該節點是否為數據節點,如果配置為 true,則主機就會作為數據節點,注意主節點也可以作為數據節點
- discovery.zen.ping.unicast.hosts 可以配置集群的主機地址,配置之后集群的主機之間可以自動發現,需要剔除自己。
- discovery.zen.minimum_master_nodes: 為了防止集群發生“腦裂”,通常需要配置集群最少主節點數目,通常為 (主節點數目 / 2) + 1
2.服務啟停
通過 -d 來后台啟動
$ ./bin/elasticsearch -d
打開網頁 http://192.168.0.1:9200/, 如果出現下面信息說明配置成功
{
name: "es-node1",
cluster_name: "es",
cluster_uuid: "XvoyA_NYTSSV8pJg0Xb23A",
version: {
number: "6.2.4",
build_hash: "ccec39f",
build_date: "2018-04-12T20:37:28.497551Z",
build_snapshot: false,
lucene_version: "7.2.1",
minimum_wire_compatibility_version: "5.6.0",
minimum_index_compatibility_version: "5.0.0"
},
tagline: "You Know, for Search"
}
集群服務健康狀況檢查,可以再任意節點通過執行如下命令,如果能發現列表展示的主節點、客戶端和數據節點都是一一對應的,那么說明集群服務都已經正常啟動了。
curl "http://ip:port/_cat/nodes"
關於ES的詳細原理與實踐,推薦《Elasticsearch 權威指南》
六、Kibana
Kibana是一個開源的分析和可視化平台,設計用於和Elasticsearch一起工作,可以通過Kibana來搜索,查看,並和存儲在Elasticsearch索引中的數據進行交互。kibana使用JavaScript語言編寫,安裝部署十分簡單,可以從elastic官網下載所需的版本,這里需要注意的是Kibana與Elasticsearch的版本必須一致,另外,在安裝Kibana時,要確保Elasticsearch、Logstash和kafka已經安裝完畢。
1.Kibana的配置
將下載的gz包解壓
$ tar -zxvf kibana-6.2.4-darwin-x86_64.tar.gz
$ mv kibana-6.2.4-darwin-x86_64.tar.gz kibana
找到配置文件kibana.yml並修改
$ vim config/kibana.yml
server.port: 5601
server.host: "192.168.0.1"
elasticsearch.url: "http://192.168.0.1:9200"
涉及到三個關鍵參數配置:
server.port: kibana綁定的監聽端口,默認是5601
server.host: kibana綁定的IP地址
elasticsearch.url: 如果是ES集群,則推薦綁定集群中任意一台ClientNode即可。
本人在項目過程中是通過Nginx配置域名來訪問Kibana的,雖然配置了映射,且在Nginx主機上curl能訪問到服務,但是域名訪問始終報404異常,后來通過添加兩項配置即可訪問:
server.basePath: "/kibana"
server.rewriteBasePath: true
2.Kibana運維命令
啟動服務:
$ nohup ./bin/kibana &
停止服務
ps -ef | grep node
kill -9 線程號
服務啟動以后可以通過訪問:http://192.168.0.1:5601/

3.查詢數據
打開discover菜單,這也是kibanan最常用的功能,選擇好時間維度來過濾數據范圍:

Kibana語法查詢,可以直接在搜索框內輸入過濾條件進行查詢:
- response:200,將匹配response字段的值是200的文檔
- message:"Quick brown fox",將在message字段中搜索"quick brown fox"這個短語。如果沒有引號,將會匹配到包含這些詞的所有文檔,而不管它們的順序如何。
- response:200 and extension:php or extension:css 將匹配response是200並且extension是php,或者匹配extension是css而response任意,括號可以改變這種優先級
- >, >=, <, <= 都是有效的操作符
- response:* 將匹配所有存在response字段的文檔
點開數據行即可查看具體數據,支持table視圖和Json文本兩種方式,日志數據都存儲在message屬性中,而前面定義的name可以查看具體的主句,log_topic則指明是來源哪個應用:

source: Kibana(一張圖片勝過千萬行日志)
總結
綜上,通過上面部署命令來實現 ELK 的整套組件,包含了日志收集、過濾、索引和可視化的全部流程,基於這套系統實現分析日志功能。同時,通過水平擴展 Kafka、Elasticsearch 集群,可以實現日均億級的日志實時存儲與處理,但是從細節方面來看,這套系統還存着許多可以繼續優化和改進的點:
- 日志格式需優化,每個系統收集的日志格式需要約定一個標准,比如各個業務系統在定義log4j或logback日志partern時可以按照【時間】【級別】【全局Traceid】【線程號】【方法名】【日志信息】統一輸出。
- Logstash的正則優化,一旦約定了日志模式,編寫Logstash的自定義grok正則就能過濾出關鍵屬性存放於ES,那么基於時間、traceId以及方法名的查詢則不在堆積於message,大大優化查詢效率。
- TraceId埋點優化,分布式與微服務架構中,一個Restful請求的發起可能會經過多達十幾個系統的處理流程,任何一個環節都有error可能,需要有一個全局ID進行全鏈路追蹤,這里需要結合Java探針把tiraceId埋入日志模板里,現有PinPoint、SkyWalking與ZipKin都能為全局ID提供成熟的解決方案。
- ES存儲優化,按照線上機器的業務量來看,每天TB級的日志數據都寫入ES會造成較大的存儲壓力,時間越久的日志利用價值則越低,可以按照7天有效期來自動清理ES索引優化存儲空間,參考【ES清理腳本】。
- 運維優化,一個復雜日志平台在運維方面有着巨大的成本,這里涉及到了Kafka、ZooKeeper、ELK等多個集群環境的維護,除了提供統一的集群操作指令以外,也需要形成對整套日志平台環境的監控視圖。
- 性能優化,多組件、混合語言、分布式環境與集群林立的復雜系統,性能問題老生常談,實踐出真知,遇到了再補充!