今天我們來聊一下利用docker部署elk日志分析系統,這里解析一下elk是啥東西。elk分別是Elasticsearch,Logstash和Kibana的首字母縮寫。
- Elasticsearch是一個基於JSON的分布式搜索和分析引擎,專為水平可擴展性,最高可靠性和易管理性而設計。
- Logstash是一個動態數據收集管道,具有可擴展的插件生態系統和強大的Elasticsearch協同作用。
- Kibana通過UI 提供數據可視化。
架構簡述
日志系統首先面臨幾個問題:
不同廠商設備的不同日志格式的處理,如何調用微信來發送報警信息。采用的解決辦法是不同廠商的設備發送日志的時候采用不同端口,日志先發送到logstash,
logstash會先解析日志成標准格式,然后logstash會做2件事情,一個是存放日志到es里面,通過kibana做出展示。
環境搭建:
我們這里采用docker容器技術來部署elk。先從docker hub 下載 Elasticsearch,Logstash和Kibana這三個鏡像。執行如下命令
[root@node1 ~]# docker pull elasticsearch [root@node1 ~]# docker pull kibana [root@node1 ~]# docker pull logstash:7.2.0
為了后續管理方便,這里編寫yml文件,然后使用docker-compose來啟動。
version: '3.3' services: elasticsearch: image: elasticsearch:latest container_name: elasticsearch hostname: elasticsearch ports: - '9200:9200' environment: ES_JAVA_OPTS: "-Xms256m -Xmx256m" discovery.type: "single-node" volumes: - "/data/elk/elasticsearch/data:/usr/elasticsearch/data" logstash: image: logstash:7.2.0 container_name: logstash hostname: logstash ports: - "514:514/udp" user: 'root' command: "logstash -f /etc/logstash.conf --config.reload.automatic" volumes: - "/data/elk/logstash/logstash.conf:/etc/logstash.conf" environment: LS_JAVA_OPTS: "-Xmx256m -Xms256m" links: - "elasticsearch:elasticsearch" kibana: image: kibana:latest container_name: kibana hostname: kibana ports: - '5601:5601' environment: ELASTICSEARCH_URL: "http://elasticsearch:9200" links: - "elasticsearch:elasticsearch"
注意:/data/elk/logstash/logstash.conf這個文件需要我們提前准備,后面會提到。
交換機配置
我們需要把交換機的日志指定到elk服務器上。
cisco: logging host 10.100.18.18 transport udp port 5002 H3C info-center enable info-center source default channel 2 trap state off // 必要,不然日志會出現 不符合級別的 alert 日志 info-center loghost 10.100.18.18 port 5003 huawei info-center enable info-center loghost 10.100.18.18 info-center timestamp log short-date info-center timestamp trap short-date
Logstash 的配置
不同廠商的日志 gork我都寫好了,復制過去就能用
input{ tcp {port => 5002 type => "Cisco"} udp {port => 514 type => "HUAWEI"} udp {port => 5002 type => "Cisco"} udp {port => 5003 type => "H3C"} } filter { if [type] == "Cisco"{ grok{ match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: .%{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" } match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: %{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" } add_field => {"severity_code" => "%{severity}"} overwrite => ["message"] } } else if [type] == "H3C"{ grok { match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{YEAR:year} %{DATA:hostname} %%%{DATA:vvmodule}/%{POSINT:severity}/%{DATA:digest}: %{GREEDYDATA:message}" } remove_field => [ "year" ] add_field => {"severity_code" => "%{severity}"} overwrite => ["message"] } } else if [type] == "HUAWEI"{ grok { match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %%%{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"} match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"} remove_field => [ "timestamp" ] add_field => {"severity_code" => "%{severity}"} overwrite => ["message"] } } mutate { gsub => [ "severity", "0", "Emergency", "severity", "1", "Alert", "severity", "2", "Critical", "severity", "3", "Error", "severity", "4", "Warning", "severity", "5", "Notice", "severity", "6", "Informational", "severity", "7", "Debug" ] } } output{ elasticsearch { index => "syslog-%{+YYYY.MM.dd}" hosts => ["your_ipaddress:9200"] } }