做了幾周的測試,踩了無數的坑,總結一下,全是干貨,給大家分享~
一、elk 實用知識點總結
1、編碼轉換問題(主要就是中文亂碼)
codec => plain { charset => "GB2312" }
filebeat.prospectors: - input_type: log paths: - c:\Users\Administrator\Desktop\performanceTrace.txt encoding: GB2312
2、刪除多余日志中的多余行
if ([message] =~ "^20.*-\ task\ request,.*,start\ time.*") { #用正則需刪除的多余行 drop {} }
2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59 #需刪除的行 -- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End -- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
3、grok 處理多種日志不同的行
2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59 -- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End -- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
(2)在logstash filter中grok 分別處理3行
match => { "message" => "^20.*-\ task\ request,.*,start\ time\:%{TIMESTAMP_ISO8601:RequestTime}" match => { "message" => "^--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End.*" } match => { "message" => "^--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End.*" } ... 等多行
4、日志多行合並處理—multiline插件(重點)
2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59 -- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End -- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
② logstash grok 對合並后多行的處理(合並多行后續都一樣,如下)
filter { grok { match => { "message" => "^%{TIMESTAMP_ISO8601:InsertTime}\ .*-\ task\ request,.*,start\ time:%{TIMESTAMP_ISO8601:RequestTime}\n--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End\n--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End" } } }
(2)在filebeat中使用multiline 插件(推薦)
negate:true/false,匹配到pattern 部分開始合並,還是不配到的合並
after:匹配到pattern 部分后合並,注意:這種情況最后一行日志不會被匹配處理
before:匹配到pattern 部分前合並(推薦)
filebeat.prospectors: - input_type: log paths: - /root/performanceTrace* fields: type: zidonghualog multiline.pattern: '.*\"WaitInterval\":.*--\ End' multiline.negate: true multiline.match: before
filebeat.prospectors: - input_type: log paths: - /root/performanceTrace* input_type: log multiline: pattern: '^20.*' negate: true match: after
(3)在logstash input中使用multiline 插件(沒有filebeat 時推薦)
negate:true/false,匹配到pattern 部分開始合並,還是不配到的合並
previous:相當於filebeat 的after
next:相當於filebeat 的before
input { file { path => ["/root/logs/log2"] start_position => "beginning" codec => multiline { pattern => "^20.*" negate => true what => "previous" } } }
(4)在logstash filter中使用multiline 插件(不推薦)
① filter設置multiline后,pipline worker會自動將為1
② 5.5 版本官方把multiline 去除了,要使用的話需下載,下載命令如下:
/usr/share/logstash/bin/logstash-plugin install logstash-filter-multiline
filter { multiline { pattern => "^20.*" negate => true what => "previous" } }
5、logstash filter 中的date使用
2018-03-20 10:44:01 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59
date { match => ["InsertTime","YYYY-MM-dd HH:mm:ss "] remove_field => "InsertTime" }
match => ["timestamp" ,"dd/MMM/YYYY H:m:s Z"]
匹配這個字段,字段的格式為:日日/月月月/年年年年 時/分/秒 時區
也可以寫為:match => ["timestamp","ISO8601"](推薦)
就是將匹配日志中時間的key 替換為@timestamp 的時間,因為@timestamp 的時間是日志送到logstash 的時間,並不是日志中真正的時間。
6、對多類日志分類處理(重點)
filebeat: prospectors: - paths: #- /mnt/data/WebApiDebugLog.txt* - /mnt/data_total/WebApiDebugLog.txt* fields: type: WebApiDebugLog_total - paths: - /mnt/data_request/WebApiDebugLog.txt* #- /mnt/data/WebApiDebugLog.txt* fields: type: WebApiDebugLog_request - paths: - /mnt/data_report/WebApiDebugLog.txt* #- /mnt/data/WebApiDebugLog.txt* fields: type: WebApiDebugLog_report
② 在logstash filter中使用if,可進行對不同類進行不同處理
filter { if [fields][type] == "WebApiDebugLog_request" { #對request 類日志 if ([message] =~ "^20.*-\ task\ report,.*,start\ time.*") { #刪除report 行 drop {} } grok { match => {"... ..."} } }
if [fields][type] == "WebApiDebugLog_total" { elasticsearch { hosts => ["6.6.6.6:9200"] index => "logstashl-WebApiDebugLog_total-%{+YYYY.MM.dd}" document_type => "WebApiDebugLog_total_logs" }
二、對elk 整體性能的優化
1、性能分析
每天每台機器可處理:24h*60min*60sec*3000*250Byte=64,800,000,000Bytes,約64G
③ 瓶頸在logstash 從redis中取數據存入ES,開啟一個logstash,每秒約處理6000條數據;開啟兩個logstash,每秒約處理10000條數據(cpu已基本跑滿);
④ logstash的啟動過程占用大量系統資源,因為腳本中要檢查java、ruby以及其他環境變量,啟動后資源占用會恢復到正常狀態。
2、關於收集日志的選擇:logstash/filter
(1)沒有原則要求使用filebeat或logstash,兩者作為shipper的功能是一樣的,區別在於:
① logstash由於集成了眾多插件,如grok,ruby,所以相比beat是重量級的;
② logstash啟動后占用資源更多,如果硬件資源足夠則無需考慮二者差異;
③ logstash基於JVM,支持跨平台;而beat使用golang編寫,AIX不支持;
④ AIX 64bit平台上需要安裝jdk(jre) 1.7 32bit,64bit的不支持;
⑤ filebeat可以直接輸入到ES,但是系統中存在logstash直接輸入到ES的情況,這將造成不同的索引類型造成檢索復雜,最好統一輸入到els 的源。
logstash/filter 總之各有千秋,但是,我推薦選擇:在每個需要收集的日志服務器上配置filebeat,因為輕量級,用於收集日志;再統一輸出給logstash,做對日志的處理;最后統一由logstash 輸出給els。
3、logstash的優化相關配置
默認配置 ---> pipeline.workers: 2
可優化為 ---> pipeline.workers: CPU內核數(或幾倍cpu內核數)
默認配置 ---> pipeline.output.workers: 1
可優化為 ---> pipeline.output.workers: 不超過pipeline 線程數
默認配置 ---> pipeline.batch.size: 125
可優化為 ---> pipeline.batch.size: 1000
默認配置 ---> pipeline.batch.delay: 5
可優化為 ---> pipeline.batch.size: 10
通過設置-w參數指定pipeline worker數量,也可直接修改配置文件logstash.yml。這會提高filter和output的線程數,如果需要的話,將其設置為cpu核心數的幾倍是安全的,線程在I/O上是空閑的。
默認每個輸出在一個pipeline worker線程上活動,可以在輸出output 中設置workers設置,不要將該值設置大於pipeline worker數。
還可以設置輸出的batch_size數,例如ES輸出與batch size一致。
filter設置multiline后,pipline worker會自動將為1,如果使用filebeat,建議在beat中就使用multiline,如果使用logstash作為shipper,建議在input 中設置multiline,不要在filter中設置multiline。
Logstash是一個基於Java開發的程序,需要運行在JVM中,可以通過配置jvm.options來針對JVM進行設定。比如內存的最大最小、垃圾清理機制等等。JVM的內存分配不能太大不能太小,太大會拖慢操作系統。太小導致無法啟動。默認如下:
4、引入Redis 的相關問題
(1)filebeat可以直接輸入到logstash(indexer),但logstash沒有存儲功能,如果需要重啟需要先停所有連入的beat,再停logstash,造成運維麻煩;另外如果logstash發生異常則會丟失數據;引入Redis作為數據緩沖池,當logstash異常停止后可以從Redis的客戶端看到數據緩存在Redis中;
(2)Redis可以使用list(最長支持4,294,967,295條)或發布訂閱存儲模式;
② requirepass ilinux.io #加密碼,為了安全運行
③ 只做隊列,沒必要持久存儲,把所有持久化功能關掉:快照(RDB文件)和追加式文件(AOF文件),性能更好
save "" 禁用快照
appendonly no 關閉RDB
maxmemory 0 #maxmemory為0的時候表示我們對Redis的內存使用沒有限制
5、elasticsearch 節點優化配置
① vm.swappiness = 1 #ES 推薦將此參數設置為 1,大幅降低 swap 分區的大小,強制最大程度的使用內存,注意,這里不要設置為 0, 這會很可能會造成 OOM ② net.core.somaxconn = 65535 #定義了每個端口最大的監聽隊列的長度 ③ vm.max_map_count= 262144 #限制一個進程可以擁有的VMA(虛擬內存區域)的數量。虛擬內存區域是一個連續的虛擬地址空間區域。當VMA 的數量超過這個值,OOM ④ fs.file-max = 518144 #設置 Linux 內核分配的文件句柄的最大數量
[root@elasticsearch]# sysctl -p 生效一下
elasticsearch soft nofile 65535 elasticsearch hard nofile 65535 elasticsearch soft memlock unlimited elasticsearch hard memlock unlimited
vim /etc/pam.d/common-session-noninteractive
session required pam_limits.so
① 將最小堆大小(Xms)和最大堆大小(Xmx)設置為彼此相等。
② Elasticsearch可用的堆越多,可用於緩存的內存就越多。但請注意,太多的堆可能會使您長時間垃圾收集暫停。
③ 設置Xmx為不超過物理RAM的50%,以確保有足夠的物理內存留給內核文件系統緩存。
④ 不要設置Xmx為JVM用於壓縮對象指針的臨界值以上;確切的截止值有所不同,但接近32 GB。不要超過32G,如果空間大,多跑幾個實例,不要讓一個實例太大內存
bootstrap.memory_lock: true #鎖住內存,不使用swap #緩存、線程等優化如下 bootstrap.mlockall: true transport.tcp.compress: true indices.fielddata.cache.size: 40% indices.cache.filter.size: 30% indices.cache.filter.terms.size: 1024mb threadpool: search: type: cached size: 100 queue_size: 2000
vim /etc/profile.d/elasticsearch.sh export ES_HEAP_SIZE=2g #Heap Size不超過物理內存的一半,且小於32G
① ES是分布式存儲,當設置同樣的cluster.name后會自動發現並加入集群;
② 集群會自動選舉一個master,當master宕機后重新選舉;
④ 為有效管理節點,可關閉廣播 discovery.zen.ping.multicast.enabled: false,並設置單播節點組discovery.zen.ping.unicast.hosts: ["ip1", "ip2", "ip3"]
6、性能的檢查
Logstash和其連接的服務運行速度一致,它可以和輸入、輸出的速度一樣快。
注意CPU是否過載。在Linux/Unix系統中可以使用top -H查看進程參數以及總計。
如果CPU使用過高,直接跳到檢查JVM堆的章節並檢查Logstash worker設置。
注意Logstash是運行在Java虛擬機中的,所以它只會用到你分配給它的最大內存。
檢查其他應用使用大量內存的情況,這將造成Logstash使用硬盤swap,這種情況會在應用占用內存超出物理內存范圍時。
使用Logstash plugin(例如使用文件輸出)磁盤會發生飽和。
當發生大量錯誤,Logstash生成大量錯誤日志時磁盤也會發生飽和。
在Linux中,可使用iostat,dstat或者其他命令監控磁盤I/O
當使用大量網絡操作的input、output時,會導致網絡飽和。
heap設置太小會導致CPU使用率過高,這是因為JVM的垃圾回收機制導致的。
一個快速檢查該設置的方法是將heap設置為兩倍大小然后檢測性能改進。不要將heap設置超過物理內存大小,保留至少1G內存給操作系統和其他進程。
你可以使用類似jmap命令行或VisualVM更加精確的計算JVM heap