ELK收集監控nginx請求日志 elastalert 報警


1.前言

對於互聯網公司來說,nginx的請求日志簡直就是一座金礦,如果不能充分利用,簡直太可惜了。
初期一般都是輸出到日志文件,要查什么就awk\grep\uniq\sort...,能滿足不少統計需求,但最大的缺點是不直觀,不方便監控(目前雖然用了ELK,但是有些信息我還是用shell統計,兩者互補)。
整理下實施ELK最起碼要實現的需求:

  • 查詢條件(精確匹配):一級域名、二級域名、客戶真實IP、HTTP狀態碼、HTTP方法、request_time、response_time、代理IP、body_bytes_sent
  • 查詢條件(模糊匹配):url(如查找SQL注入關鍵字)、refere(流量來源)、agent(如查找搜索引擎)
  • 近期(1周、1個月)內整體請求量走勢情況;
  • 如果發現總體走勢異常,要很方便找到那個域名走勢異常;
  • 過去一個周期內(1天、1周、1月)所有請求構成,按不同域名出餅圖;
  • 實時監控爬蟲IP過高的頻率訪問(如單個IP1分鍾請求超過100次報警);
  • 實時監控500狀態請求情況(如單個域名1分鍾出現30個500就報警);
  • ……

2.拓撲

nginx需要配置syslog協議輸出;
logstash作為syslog服務器,收集日志,輸出2個方向:elastersearch入庫,本地文件;
elasticsearch需要設計好模型,目的:支持不同字段的查找需求(精確或模糊,甚至某個字段同時要支持精確+模糊,不過我沒用到)、空間不浪費;
kibana可視化,主要是配置Discovery\Visualize;
elastalert,配置各種規則,實現實時監控需求。

3.nginx配置

nginx.conf
日志以json格式輸出,方便logstash解析;
因為syslog協議一條消息最大2K,因此有些變了做了階段(_short后綴的變量);
level1domain、level2domain分別指一級域名、二級域名;

log_format main_json '{"project":"${level1domain}","domain":"${level1domain}_${level2domain}","real_ip":"$real_ip","http_x_forwarded_for":"$http_x_forwarded_for","time_local":"$time_iso8601"," request":"$request_short","request_body":"$request_body_short","status":$status,"body_bytes_sent":"$body_bytes_sent","http_referer":"$http_referer_short","upstream_response_time":"$upstream_re sponse_time","request_time":"$request_time","http_user_agent":"$http_user_agent"}';

location.conf

#取前750個字節
if ( $request ~ "^(.{0,750})" ) {
    set $request_short $1;
}
#取前750個字節
if ( $request_body ~ "^(.{0,750})" ) {
    set $request_body_short $1;
}
#取前100個字節
set $http_referer_short "-";
if ( $http_referer ~ "^(.{1,100})" ) {
    set $http_referer_short $1;
}
#從$http_x_forward_for中獲取第一個IP,作為客戶端實際IP
set $real_ip $remote_addr;
if ( $http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)" ) {
    set $real_ip $1;
}
#server_name的格式是:N級域名.……三級域名.二級域名.一級域名.com或cn,或者一級域名.com或cn;
#解析一級域名部分為$level1domain
#解析一級域名之前的部分為$level2domain
set $level1domain unparse;
set $level2domain unparse;
if ( $server_name ~ "^(.+)\.([0-9a-zA-Z]+)\.(com|cn)$" ) {
    set $level1domain $2;
    set $level2domain $1;
}
if ( $server_name ~ "^([0-9a-zA-Z]+)\.(com|cn)$" ) {
    set $level1domain $1;
    set $level2domain none;
}
#syslog輸出配置
access_log syslog:local7:info:logstash_ip:515:nginx main_json; 

4.logstash配置

安裝:

安裝jdk8
解壓logstash-6.2.1.tar.gz
查看插件:
./logstash-plugin list | grep syslog
安裝非默認插件
./logstash-plugin install logstash-filter-alter
測試:
# ./logstash -e 'input { stdin { } } output { stdout {} }'
啟動:
啟動logstash:nohup ./bin/logstash -f mylogstash.conf & disown

配置:

mylogstash.conf

input {
    syslog {
        type => "system-syslog"
        port => 515
    }
}

filter {
    #在json化之前,使用mutte對\\x字符串進行替換,防止以下錯誤:ParserError: Unrecognized character escape 'x' (code 120)
    mutate {
        gsub => ["message", "\\x", "\\\x"]
    }
    json {
        source => "message"
        #刪除無用字段,節約空間
        remove_field => "message"
        remove_field => "severity"
        remove_field => "pid"
        remove_field => "logsource"
        remove_field => "timestamp"
        remove_field => "facility_label"
        remove_field => "type"
        remove_field => "facility"
        remove_field => "@version"
        remove_field => "priority"
        remove_field => "severity_label"
    }
    date {
        #用nginx請求時間替換logstash生成的時間
        match => ["time_local", "ISO8601"]
        target => "@timestamp"
    }
    grok {
        #從時間中獲取day
        match => { "time_local" => "(?<day>.{10})" }
    }
    grok {
        #將request解析成2個字段:method\url
        match => { "request" => "%{WORD:method} (?<url>.* )" }
    }
    grok {
        #截取http_referer問號前的部分,問號后的信息無價值,浪費空間
        match => { "http_referer" => "(?<referer>-|%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?)" }
    }
    mutate {
        #解析出新的字段后,原字段丟棄
        remove_field => "request"
        remove_field => "http_referer"
        rename => { "http_user_agent" => "agent" }
        rename => { "upstream_response_time" => "response_time" }
        rename => { "host" => "log_source" }
        rename => { "http_x_forwarded_for" => "x_forwarded_for" }
        #以下2個字段以逗號分隔后,以數組形式入庫
        split => { "x_forwarded_for" => ", " }
        split => { "response_time" => ", " }
    }
    alter {
        #不滿足elasticsearch索引模型的,入庫會失敗,因此做以下數據轉換
        condrewrite => [
            "x_forwarded_for", "-", "0.0.0.0",
            "x_forwarded_for", "unknown", "0.0.0.0",
            "response_time", "-", "0",
            "real_ip", "", "0.0.0.0"
        ]
    }
}

output {
    #入庫,以template指定的模型作為索引模型
    elasticsearch { 
        hosts => ["elasticsearch_ip:9200"] 
        index => "nginx-%{day}"
        manage_template => true
        template_overwrite => true
        template_name => "mynginx"
        template => "/root/logstash/mynginxtemplate.json"
        codec => json
    }
    #本地文件放一份,作為ELK的補充
    file {
        flush_interval => 600
        path => '/nginxlog/%{day}/%{domain}.log'
        codec => line { format => '<%{time_local}> <%{real_ip}> <%{method}> <%{url}> <%{status}> <%{request_time}> <%{response_time}> <%{body_bytes_sent}> <%{request_body}> <%{referer}> <%{x_f
orwarded_for}> <%{log_source}> <%{agent}>'}
    }
}

mynginxtemplate.json

{  
    "template": "nginx-*",  
    "settings": {  
        "index.number_of_shards": 8,  
        "number_of_replicas": 0,
        "analysis": {
            "analyzer": {
                #自定義stop關鍵字,不收集http等字段的索引
                "stop_url": {
                    "type": "stop",
                    "stopwords": ["http","https","www","com","cn","net"]
                }
            }
         }
     },  
    "mappings" : {  
      "doc" : {  
        "properties" : {
          # index:true 分詞、生產搜索引擎
          # analyzer:指定索引分析器
          "referer": {
            "type": "text",
            "norms": false,
            "index": true,
            "analyzer": "stop_url"
          },
         "agent": {
            "type": "text",
            "norms": false,
            "index": true
          },
         # IP字段類型
         "real_ip": {
            "type": "ip"
          },
         "x_forwarded_for": {
            "type": "ip"
          },
         # keyword,作為完整字段索引,不可分詞索引
          "status": {
            "type": "keyword"
          },
          "method": {
            "type": "keyword"
          },
          "url": {
            "type": "text",
            "norms": false,
            "index": true,
            "analyzer": "stop_url"
          },
          "status": {
            "type": "keyword"
          },
          "response_time": {
            "type": "half_float"
          },
          "request_time": {
            "type": "half_float"
          },
          "domain": {
            "type": "keyword"
          },
          "project": {
            "type": "keyword"
          },
          "request_body": {
            "type": "text",
            "norms": false,
            "index": true
          },
          "body_bytes_sent": {
            "type": "long"
          },
          "log_source": {
            "type": "ip"
          },
          "@timestamp" : {  
            "type" : "date",  
            "format" : "dateOptionalTime",  
            "doc_values" : true  
          },
          "time_local": {
            "enabled": false
          },
          "day": {
            "enabled": false
          }
        }  
      }  
    }  
}

5.elasticsearch配置

elasticsearch.yml

cluster.name: nginxelastic
# 節點名稱,每個節點不同
node.name: node1
bootstrap.system_call_filter: false
bootstrap.memory_lock: true
# 本節點IP
network.host: 10.10.10.1
http.port: 9200
transport.tcp.port: 9300
# 單播自動發現,配置集群中其他節點的IP+端口,host1:port1,host2:port2,本例中只有2個節點,因此只配置另一個節點的IP和端口
discovery.zen.ping.unicast.hosts: ["other_node_ip:9300"]
# 一個節點需要看到的具有master節點資格的最小數量,推薦(N/2)+1
discovery.zen.minimum_master_nodes: 2
http.cors.enabled: true
http.cors.allow-origin: /.*/
path.data: /elastic/data
path.logs: /elastic/logs
 
jvm.options 
# jvm初始和最大內存,建議設置為服務器內存的一半
-Xms8g
-Xmx8g

crontab自動刪除歷史數據del_index.sh 

#!/bin/bash
DELINDEX="nginx-"`date -d "-30 day" +%Y-%m-%d`
curl -H "Content-Type: application/json" -XDELETE 'http://10.10.10.1:9200/'"${DELINDEX}"

6.kibana配置

kibana.yml

server.port: 80
server.host: 10.10.10.3
elasticsearch.url: "http://10.10.10.1:9200"
elasticsearch.username: "kibana"
elasticsearch.password: "mypwd"

界面設置:
management -> advanced settings:
dateFormat(日期格式):YYYYMMDD HH:mm:ss
defaultColumns(默認字段):  method, url, status, request_time, real_ip

查詢某域名下耗時超過1秒的請求

查詢過去24小時各域名請流量柱狀圖

7.elastalert配置

官方有個watcher可用於實時監控ELK收集的日志,可惜是商業版的,想用免費的,elastalert是個不錯的方案。
https://github.com/Yelp/elastalert

elastalert常用的監控類型有frequency\spike\等(http://elastalert.readthedocs.io/en/latest/ruletypes.html#rule-types)

  • frequency: 監控特定事件出現的頻率,如某IP每分鍾請求超過600次,某域名每分鍾出現30個以上耗時超過3秒的請求,某域名每分鍾出現10個以上500狀態的請求等。
  • spike:監控事件出現的變化幅度,如最近1小時比上1小時請求增加了1倍,最近1天比上一天請求了減少了50%等等。

這里以frequency類型的監控為例,實時監控500狀態錯誤。

config.yaml

# 指定es地址
es_host: 10.10.10.1
es_port: 9200

freq-500.yaml   

#文件名自定義,容易區分即可

 

es_host: 10.10.10.1
es_port: 9200
 
name: elk-nginx-freq-500
 
type: frequency
 
index: nginx-*
# 周期內出現10次以上則報警
num_events: 10
# 周期1分鍾
timeframe:
  minutes: 1
# 查詢條件
# status in (500,501,502,503,504)
# domain 不包含test,即測試域名下的事件忽略
filter:
- bool:
    must:
    - terms: 
        status: ["500","501","502","503","504"]
    must_not:
    - wildcard:
        domain: "*test*"
# 對每個domain單獨計算num_events,最多計算10個domain,某個domain的num_events達到10個,才會報警
use_terms_query: true
doc_type: doc
terms_size: 10
query_key: domain
# 分別以domain和status列出top5的keys數量,報警郵件中提高top 5 domain和top 5 status
top_count_keys: 
- domain
- status
top_count_number: 5
raw_count_keys: false
# 10分鍾內不重復報警
realert:
  minutes: 10
# 分別通過command(短信)和email報警
alert:
- command
- email
# 自己寫的調用短信接口的命令發生短信,短信內容比較簡單,通知什么域名出現500狀態報警
new_style_string_format: true
command: ["/root/elastalert-0.1.29/myalert/sms.sh", "15800000000", "elk nginx warning - freq 500 exceed, domain: {match[domain]}"]
# 以下是elastalert封裝好的email報警配置
# smtp_auth_file.yaml中配置郵件的用戶名密碼
smtp_host: smtp.exmail.qq.com
smtp_port: 465
smtp_ssl : true
from_addr: "elastalert@mydomain.com"
smtp_auth_file: "/root/elastalert-0.1.29/myalert/smtp_auth_file.yaml"
email:
- "myemail@mydomain.com"
 
alert_subject: "elk nginx warning - freq 500 exceed, domain: {0}"
alert_subject_args:
- domain

 

啟動監控:

python -m elastalert.elastalert --verbose --rule freq-500.yaml >> freq-500.log 2>&1 & disown

報警郵件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM