基於Flume+Kafka+ Elasticsearch+Storm的海量日志實時分析平台(轉)


0背景介紹

隨着機器個數的增加、各種服務、各種組件的擴容、開發人員的遞增,日志的運維問題是日漸尖銳。通常,日志都是存儲在服務運行的本地機器上,使用腳本來管理,一般非壓縮日志保留最近三天,壓縮保留最近1個月,其它直接刪除或遷移到日志服務器上。

運維會將這些日志mount到遠程的日志服務器上,然后開發人員使用運維分配的賬號登陸堡壘機器跳轉到日志服務器上查看不同項目不同機器的日志。

下圖是日志服務器某一個項目的所有ip日志目錄截圖,相信大家傳統的查看日志類似這樣。



如果你要查閱不同的項目,項目機器數十上百的話,將會是非常繁瑣和低效的事,並且運維管理、賬戶安全都是需要解決的問題。另外如果你要對這些日志進行實時分析統計,你會發現無從下手,用shell等腳本語言只能處理非常簡單需求,難於滿足業務的發展,所以當務之急是要將所有日志集中化管理,將所有服務器上的日志收集匯總。

為此需要設計一個集中式海量日志實時處理系統,它需要滿足產品需求(實時看日志、統計歷史日志、實時行為分析、用戶軌跡跟蹤等)、滿足運維需求(項目上下線日志管理、日志開關在線管理等)、性能需求(具有高吞吐能力、高擴展性、高容錯性)。

 

1EFK平台簡介

這樣集中管理后,需要考慮解決實時查看日志類似tail -f命令,日志統計或檢索,類似grep、awk、wc等Linux命令,並對更高要求的多機器日志全文檢索、排序和統計等問題。

目前業內比較流行開源實時日志分析ELK三件套為我們提供了思路和幫助,結合獵聘自己的業務模式及現有服務架構,經過多種組件的優劣對比,我們設計了自己的EFK海量日志實時分析平台。

先大致介紹下EFK組件,EFK是Elasticsearch、Flume、Kafka的簡稱,這三者是核心部件,但並非全部組件,后文還會介紹架構設計中的其他組件。

Elasticsearch是個開源實時分布式搜索引擎,它的特點有:

分布式,零配置,自動發現,索引自動分片,索引副本機制,restful風格接口,多數據源,自動搜索負載等。

Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,它支持在日志系統中定制各類數據發送方,用於收集數據;同時提供對數據進行簡單處理,並寫到各種數據接受方(可定制)。

Kafka是一種高吞吐量的分布式發布訂閱消息系統,它適合處理海量日志發布訂閱,提供消息磁盤持久化、支持物理分片存儲、多組消費等特性。

目前EFK平台接收日志的平均QPS為6w/s,日增20億條日志,日志大小970G,磁盤存量10T日志,3台物理機器支撐。該平台通過Flume為線上集群的上千個實例實時收集日志,日志種類目前主要分為nginx和tomcat日志(后續還會迭代其他組件日志),並將日志實時傳入Kafka集群。

日志消費方式分為三種:

第一會通過Web平台使用瀏覽器實時在線瀏覽日志,基於redis發布訂閱方式實現,瀏覽延遲基本小於5s;

第二會通過Web平台全文檢索、統計歷史日志,基於將kafka日志灌入ES集群實現,日志搜索延遲平均控制1分鍾左右;

第三會在后台進行實時統計分析日志,做一些內部規則處理,基於Storm流式計算實現。

2架構設計

 

普通的ELK架構是將Logstash部署於各個AppServer節點上搜集相關日志,經過過濾處理后發送給遠端服務器上的Elasticsearch集群進行存儲,然后用戶使用kibana從ES集群中進行日志查詢生成圖表。

而相對說,EFK架構會有所區別:我們使用Flume實時日志采集系統收集日志,並且引入了Kafka消息隊列,將flume采集到的日志存入kafka,然后使用flume集群消費kafka中的日志,經過過濾處理分別將日志傳遞給redis集群和Elasticsearch集群,最后使用自研的Log Web程序將實時日志和歷史日志呈現給用戶。

下面是系統整體的架構設計圖:

為什么沒有使用Logstash而使用開源的日志收集系統Flume是出於幾點因素考慮:

  • Logstash是ruby語言開發的,跟團隊使用語言java不符合,不容易排查問題及二次開發 ;

  • Flume是java開發,高可用,高可靠的分布式日志采集系統,業內也大量使用,並且它系統框架好,模塊分明,易於訂制開發;

  • 公司有統一的開發框架,所有日志打印都是標准的規范格式,不需要太多的訂制filter處理日志,所以Logstash使用場景就不大了。

我們使用flume的 Exec source方式組織數據,也就是tail -F [file],實時進行日志讀取傳輸,達到准實時日志同步,后文會分享flume這塊代碼邏輯。

引入Kafka是因為幾點考慮:

  • 日志采集后的處理方式多樣化 ;

  • 線上業務集群規模較大,日志產生量巨大,如果直接同步日志對下游服務負荷較重,容易因為故障導致日志阻塞延遲和丟失,所以引入了kafka ;

  • 消息可以持久化,並且可以進行日志回溯。有了消息隊列,上游服務和下游服務完全解藕,網絡傳輸會更穩定、更高效、更均衡,避免級聯效應。

 

下面介紹整個系統的分工:

A. Agent層,在每台服務器上部署一個flume進程,負責對單機上所有項目日志進行收集,使用Command: tail  --pid{pid} -F {file} ,另外如果在文件名有變化時,會從文件的第一行開始讀日志,命令會變為tail -n +1 --pid {pid}-F {file} 。Flume的Source目前做的工作比較簡單,實時讀取日志封裝成Event批量放入Channel,目前提供tomcat錯誤日志、調試日志、事件日志、SQL日志、慢日志、nginx等類型日志接入。

Flume的Sink主要工作是從Channel里拿到Event,然后在原始日志前面加上header,批量發往Kafka。header的信息包括:{numberId , ip ,appName, logType},numberId:表示該IP這個日志類型的一行日志的唯一編號,id是遞增的;ip:表示日志所在的機器ip;appName:表示日志歸屬的應用名稱,如:ins-xxx-platform;logType:表示日志的類型,如:tomcat_event。這個header在后續其他系統進行日志處理時,起着表明日志身份的作用。

目前Agent 部署規模:上千實例,單機接入10+的實例, 單實例6+的日志類型,業務日志量20億+/天,每天高峰6w+/秒,日增20億條日志,日增970G日志。

下面是單機flume運行截圖:

flume運行高峰時負載情況截圖:

B. Kafka層,這一層架構非常簡單,由三個實例組成集群,日志備份保留3天。Topic定義規則是:$appName_$logType,確保一個項目的一個日志類型都在一個topic里,目前Partition分區為1,是為了保證日志順序,如果以后日志量變大了可以根據一個項目的ip數進行Partition設置,提高kafka處理能力。

Consumer目前配置了多個group組,分別消費日志到redis集群、Elasticsearch集群、Storm集群、監控項目。

目前Kafka部署規模:3台物理機,每台機器一個實例,組成一個集群。

下面是單機Kafka運行截圖:

kafka運行平均cpu情況截圖:

C.Flume Cluster層,主要是將kafka的日志(app和nginx)分別消費到redis集群、Elasticsearch集群。這一層大家可能有疑問,為什么用flume來開發?因為我們對這一層的定位是可以分布式部署,可以水平擴容,以java進程模式運行,並且可以實時消費kafka數據到不同目標源。

鑒於flume天然的source、channel、sink架構設計,完美支持目前設計需要,只需要在它之上開發source和sink,節省開發成本。從kafka消費數據,我們開發了KafkaSource,將日志傳輸到channel,sink這塊我們開發了RedisSink和ESSink,分別將日志發布到redis里提供實時訂閱、發布到es里提供歷史日志檢索。

目前部署規模:3台物理機,每台機器部署3個實例。

flume2es運行平均cpu情況截圖:

flume2redis運行平均cpu情況截圖:

flume2es(nginx)運行平均cpu情況截圖:

D. Redis層,主要用到了它的發布訂閱功能和list數據結構。主要使用在實時日志瀏覽的場景,下文會介紹功能實現。

目前部署規模:6個實例,在6台機器上,一致性hash部署

redis運行情況截圖:

E. Elasticsearch層,主要存放歷史日志,使用場景在對歷史日志進行檢索。目前日志是分片存儲,分片規則是按機器路由,默認針對中等及偏小規模的項目日志分3片存儲,大規模日志分6片存儲,這取決於機器規模。

目前部署規模:3台物理機,每台機器4個實例,每台配置1.5TPCIE + 8T普通SAS硬盤,1天內的日志放在PCIE上,加快索引及檢索速度,6天前的日志放在普通硬盤上,目前es集群只保留一周的日志數據。

ES中某一個項目日志分布情況截圖:

F. Log Web及Nginx層,主要曾擔着實時日志展示、歷史日志檢索、歷史日志個性化統計、用戶行為追蹤等功能。這塊完全自主開發,滿足研發、測試、運維的需求,目前一直收集用戶反饋,持續迭代中。

目前部署規模:分布式部署6個實例,nginx反向代理,iphash的路由規則。

下面是單個實例運行監控情況截圖:


3實時日志瀏覽

       

下圖是Log Web系統里,實時日志頁面截圖:


實時日志頁面功能:支持項目、日志類型、ip(單個或組合)選擇,搜索支持grep語法、關鍵詞高亮、接口耗時過濾、esc快捷方式暫停、清屏、日志滾動到最新(當瀏覽器顯示慢時)、日志顯示速度控制、日志延時顯示(日志產生到頁面顯示耗時)等功能。



幫助頁面還提供操作手冊、在線反饋、日常上線管理、頁面顏色控制等功能。

實現日志實現邏輯:

這塊實現經歷了三個階段:單機內存kafka消費模式、分布式內存kafka消費模式、分布式redis發布訂閱模式。頁面都是使用Ajax定時輪詢的方式獲取日志,然后顯示在頁面中。

a. 單機內存kafka消費模式比較復雜,用戶在log web頁面查看一個實時日志,后台立即從kafka訂閱topic消費日志,放入jvm里維護的一塊日志消費隊列,做一層緩沖。頁面ajax每次請求時會帶上上次已查看日志的偏移量,在內存中算出這次需要返回的日志列表,如果是多用戶查看一個日志,會因為每個用戶頁面滾動的速度不同,算出不同的日志列表來。

這種方式會有些瓶頸:

  • 算法復雜,問題難定位 

  • 內存開銷大,gc比較頻繁 

  • 線程數過多,每個日志類型是一個線程占用 

  • kafka鏈接數較大,初始訂閱延遲較長。

b. 分布式內存kafka消費模式,這個方案也是臨時過渡階段,因為隨着項目日志接入越多,日志topic是幾何增長,hash分配到每個logweb節點上的topic還是會非常多,性能開銷非常大。

c. 分布式redis發布訂閱模式,是目前正在使用的方案。此方案比較簡單,性能開銷非常小,服務運行比較平穩。過程大致是這樣:用戶在頁面查看一個類型的日志,log web會啟動一個線程處理這個用戶請求,它會通知后台flume集群中的flume2redis進程開始消費kafka數據,將日志發布到redis里,這樣log web里處理用戶請求的線程就能訂閱到redis的日志,並且把日志緩沖到redis list隊列里,頁面ajax請求輪詢從list隊列里順序消費日志,使用redis LTRIM命令,並且在用戶暫停滾動日志5分鍾或切換日志時,主動銷毀線程、取消訂閱、刪除list隊列。另外這塊nginx配置成iphash的方式,一個用戶始終落到一個節點。

 

4歷史日志檢索

 

下圖是Log Web系統里,歷史日志頁面截圖:


歷史日志頁面功能:支持項目、日志類型、ip(單個或組合)選擇,搜索支持grep語法、關鍵詞高亮、接口耗時過濾、日志時間范圍選擇、快捷方式支持、搜索正序倒序、頁面滾動行數、檢索總記錄數等功能。實時日志頁面和歷史日志頁面相互切換,所選擇的參數項都會帶着,避免重復選擇。

歷史日志實現邏輯:

將用戶選擇的參數及填寫的檢索內容翻譯成ES的Query DSL查詢表達式,放到es里檢索,將數據再返回到頁面上。

下圖是檢索“java”關鍵詞截圖:

翻譯成DSL表達式截圖:

 

5歷史統計、用戶追蹤、實時日志流式分析

 

這塊不詳細講了,因為涉及公司內部產品,只說下思路。

歷史統計:將日志統計分析及報表功能,抽象成統計模板,用戶只需要創建任務,錄入統計參數,歷史統計模塊就會自動幫用戶在規定的時間或周期性統計出報表或原始日志,提供展示、下載、郵件發送功能。

用戶追蹤:目前公司統一的開發框架會對每一次用戶請求及內部RPC調用都會記錄詳細事件日志,收集這些日志,將這些數據抽到es里,然后在用追蹤頁面展現用戶訪問的行為軌跡。

實時日志流式分析:收集nginx日志,實時分析日志中的業務接口訪問量、頻次情況、域名流量分布、用戶行為分析及內部規則處理等。

6參數配置

 

flume的source、channel、sink參數配置截圖,這是一個實時日志量比較大的項目配置

 

kafka的server.properties配置

 

es的elasticsearch.yml配置

 

es的模板配置:

 

 

 

7監控運維

 

服務搭建起來了,后期的高效、穩定的運行也是至關重要的,特別是對這種相對復雜的大型系統。整個架構體系涉及到的中間件比較多,各個環節都需要監控起來,我們除了使用第三方監控軟件,也自己開發了一些維度的監控手段,做到在用戶之前發現問題。

A. 系統負載、cpu、磁盤、內存等使用zabbix監控報警

B. Kafka使用的監控工具:KafkaOffsetMonitor

C. Elasticsearch使用的監控工具:kopf、bigdesk

D. redis使用zabbix監控cpu及內存使用情況

E. log web使用zabbix、cat監控服務各維度性能指標。

另外我們也開發了針對整個日志系統的監控大盤及日志流量實時統計圖

 

這個是zabbix監控機器cpu load情況截圖:

使用KafkaOffsetMonitor查看kafka的topic情況截圖:

es實時寫入document速率情況截圖:

es集群及節點運行情況截圖:

下圖是項目日志監控大盤,實時列出日志經過Flume、kafka、es流水量及堵塞情況,可以一目了然知道那個項目那個ip消息在那個環節有延遲。

下圖是flume、kafka、es三個環節收到日志實時數量統計圖,可以對幾天的數據進行對比。正常情況下三條線是擬合狀態,如果某一個線出現波動,證明它消費數據慢了,需要check。

下面是實時日志瀏覽和歷史日志檢索旁路監控報表,會模擬用戶發起請求,監控頁面功能是否可用,定時監控預警。

 

 

8細節及技巧處理

 

a. flume在處理日志時,需要區分日志名字,如事件日志以小時切割,當前文件名始終是 eventInfo.log,這樣可以使用tail -F 命令。但錯誤日志以天切割,當前文件都是帶時間的,如catalina.04-20.out ,這樣當切換日志時,需要額外處理,變化tail-F 的文件名。為此我們采用文件夾監聽(WatchService)的方式,動態切換文件,並且動態Source釋放創建。

b. flume日志采集傳輸實時性優化:source在將Event批量傳輸到Channel時,需要加超時控制,避免批量大小一直不滿足,導致日志一直不發送。sink里將日志傳輸到kafka一樣,需要注意。偽代碼如下:

        if (eventList.size() >=bufferCount || timeout()) {

          flushEventBatch(eventList);

        }

      private boolean timeout() {

           return (當前時間 - 上次寫channel時間) >= timeout(3秒);

        }

c.flume目前配置內存模式,為了提高采集速度,channel是內存方式,但當flume重啟或宕機時會丟失日志。目前考慮使用斷點續傳的方式,在磁盤上存儲已讀取偏移行數,此偏移是在sink往kafka成功后更新。

d. 針對日志量大的項目需要調大MemoryChannel的capacity

e. kafka的Topic定義以前是ip區分的,當時是因為內存模型導致線程太多,現在不區分了,這塊需要考慮維護成本,topic的數量太多,因為后期需要監控運維。

f. 如果要保證日志順序,假如topic不區分ip,這時topic必須設置一個分區;假如topic是區分ip的,那么可以根據ip來設置分區數,這樣可以提高kafka吞吐能力。

g. kafka消費的flume集群,目前是人工指定topic消費的,如果flume宕機會有風險,需要采用failover機制。

h. elasticsearch磁盤需要分快盤和慢盤,也就是固態硬盤和機械硬盤。當天的日志放到固態硬盤里es寫入快,查詢也快,頁面查詢體驗好,6天前的日志遷移到慢盤存儲,因為相對來說查詢量少,並且無更新刪除操作。這塊需要做一個執行計划每天晚上定時執行。

i.  elasticsearch索引生效也就是commit操作不需要特別頻繁,如果太頻繁會對es壓力較大,目前我們控制在30秒1次commit。因為日志產生到能在es里檢索出來,不需要太實時,開發人員使用的場景都是滯后的,不像前端業務搜索需要達到准實時。

j. 頁面查詢過濾語句一定要貼近linux的grep語法,這樣開發人員在使用日志系統的學習成本會非常低。

    

k. 頁面日志展現優化

問題:日志產生速度最快會達到5k條/s,實時頁面如果展現,用戶cpu高,瀏覽器占用高

優化過程:

ajax:數據異步批量獲取,前端數據容器(數據緩存),dom模板化異步創建、dom釋放、滾動、事件代理。

忽略策略:當數據延遲超過閥值說明數據生產太快,消費慢,則只看最近的數據,歷史數據可以到歷史日志中查看。

請求數據量伸縮:依據客戶端消費情況,如果消費延遲到一定閥值,則每次向后端多請求一些數據。

壓縮:在一定量時自動開啟 gzip,需要權衡對客戶端cpu的損耗。

l. 需要監控項目日志日生產量大小,同比監控預警,避免突然某個項目日志產生過大,超過預期配置和資源情況,也可以及時通知業務端查找問題及處理。

 上面提到的kafka集群、zookeeper集群、elasticsearch集群、flume集群,目前我們只用到了三台物理機,機器配置:24核cpu、260G內存、9T硬盤(此處應該有掌聲),機器load average低峰10+,高峰時150+。

 

9結束語

 

一個好的系統誕生離不開三個階段:工具化、產品化、運營化。我認為技術人員不光光只關注技術本身,還應該多關注下產品體驗、用戶反饋、系統運營,前期找一批天使用戶,讓他們多試用及反饋,這樣你開發的系統才是一個真正易用、好用、想用的產品,系統的NPS才會高。 整個系統設計開發中遇到了很多問題及難題,我們都是通過架構調整和代碼邏輯梳理解決的。

另外在這么一個龐大的系統運行中,一定會遇到資源緊張(cpu load高)的情況,這時建議大家不要隨意擴容機器解決,這樣會很容易漏掉一個坑或隱患,其實這些問題可以從架構設計、參數配置、代碼優化着手考慮,相信性能的提升空間會非常多。

今天的分享就到這里,希望對大家有所借鑒,也希望大家多提點意見,歡迎大家拍磚。

Q&A

問:就是剛才說的集群用了三台,zookeeper、flume,他們分別是三台物理機嗎?

袁曉亮:是的。

問:header的信息包括:{numberId , ip , appName, logType},numberId:表示該IP這個日志類型的一行日志的唯一編號,id是遞增的;請問id遞增這里有什么特殊設計嗎,怎樣的id遞增?

袁曉亮:這個放入es的自增唯一id,排序用。id遞增的方法,單機內存自增、分布式id自增(Snowflake算法)。

問:hive不是負責計算 計算最終都要落到Hadoop map reduce 或spark計算?

袁曉亮:后面會考慮使用es和hadoop的集成,做分析。

問:flume直接用tail,重啟的時候怎么保證日志讀完整呢?是准備結合后面要加發送到kafka行號一起解決嗎?

袁曉亮:增加斷點續傳功能,在agent本地記錄文件,消費日志的行號,這個在sink發給kafka后更新。

問:機器配置:24核cpu、260G內存、9T硬盤(此處應該有掌聲)這里的260G不是指的內存,是指的SSD硬盤吧?

袁曉亮:內存是260G,ssd是1.5T。

問:kafka集群、zookeeper集群、elasticsearch集群、flume集群都放到三台物理機上,各個集群之間有隔離嗎?

袁曉亮:目前是因為機器資源問題,都是分布式放在3台機器上,沒隔離。

問:既然是分布式部署,對日志的順序要求還非常嚴格嗎,如果partition都是為1,怎么樣多進程處理呢?

袁曉亮:用戶需求非常嚴格,根據ip來分多channel處理,每個ip是順序的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM