今天我們來聊一下利用docker部署elk日志分析系統,這里解析一下elk是啥東西。elk分別是Elasticsearch,Logstash和Kibana的首字母縮寫。
- Elasticsearch是一個基於JSON的分布式搜索和分析引擎,專為水平可擴展性,最高可靠性和易管理性而設計。
- Logstash是一個動態數據收集管道,具有可擴展的插件生態系統和強大的Elasticsearch協同作用。
- Kibana通過UI 提供數據可視化。
架構簡述
日志系統首先面臨幾個問題:
不同廠商設備的不同日志格式的處理,如何調用微信來發送報警信息。采用的解決辦法是不同廠商的設備發送日志的時候采用不同端口,日志先發送到logstash,
logstash會先解析日志成標准格式,然后logstash會做2件事情,一個是存放日志到es里面,通過kibana做出展示。
環境搭建:
我們這里采用docker容器技術來部署elk。先從docker hub 下載 Elasticsearch,Logstash和Kibana這三個鏡像。執行如下命令
[root@node1 ~]# docker pull elasticsearch [root@node1 ~]# docker pull kibana [root@node1 ~]# docker pull logstash:7.2.0
為了后續管理方便,這里編寫yml文件,然后使用docker-compose來啟動。
version: '3.3'
services:
elasticsearch:
image: elasticsearch:latest
container_name: elasticsearch
hostname: elasticsearch
ports:
- '9200:9200'
environment:
ES_JAVA_OPTS: "-Xms256m -Xmx256m"
discovery.type: "single-node"
volumes:
- "/data/elk/elasticsearch/data:/usr/elasticsearch/data"
logstash:
image: logstash:7.2.0
container_name: logstash
hostname: logstash
ports:
- "514:514/udp"
user: 'root'
command: "logstash -f /etc/logstash.conf --config.reload.automatic"
volumes:
- "/data/elk/logstash/logstash.conf:/etc/logstash.conf"
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
links:
- "elasticsearch:elasticsearch"
kibana:
image: kibana:latest
container_name: kibana
hostname: kibana
ports:
- '5601:5601'
environment:
ELASTICSEARCH_URL: "http://elasticsearch:9200"
links:
- "elasticsearch:elasticsearch"
注意:/data/elk/logstash/logstash.conf這個文件需要我們提前准備,后面會提到。
交換機配置
我們需要把交換機的日志指定到elk服務器上。
cisco: logging host 10.100.18.18 transport udp port 5002 H3C info-center enable info-center source default channel 2 trap state off // 必要,不然日志會出現 不符合級別的 alert 日志 info-center loghost 10.100.18.18 port 5003 huawei info-center enable info-center loghost 10.100.18.18 info-center timestamp log short-date info-center timestamp trap short-date
Logstash 的配置
不同廠商的日志 gork我都寫好了,復制過去就能用
input{
tcp {port => 5002 type => "Cisco"}
udp {port => 514 type => "HUAWEI"}
udp {port => 5002 type => "Cisco"}
udp {port => 5003 type => "H3C"}
}
filter {
if [type] == "Cisco"{
grok{
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: .%{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: %{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
add_field => {"severity_code" => "%{severity}"}
overwrite => ["message"]
}
}
else if [type] == "H3C"{
grok {
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{YEAR:year} %{DATA:hostname} %%%{DATA:vvmodule}/%{POSINT:severity}/%{DATA:digest}: %{GREEDYDATA:message}" }
remove_field => [ "year" ]
add_field => {"severity_code" => "%{severity}"}
overwrite => ["message"]
}
}
else if [type] == "HUAWEI"{
grok {
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %%%{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
remove_field => [ "timestamp" ]
add_field => {"severity_code" => "%{severity}"}
overwrite => ["message"]
}
}
mutate {
gsub => [
"severity", "0", "Emergency",
"severity", "1", "Alert",
"severity", "2", "Critical",
"severity", "3", "Error",
"severity", "4", "Warning",
"severity", "5", "Notice",
"severity", "6", "Informational",
"severity", "7", "Debug"
]
}
}
output{
elasticsearch {
index => "syslog-%{+YYYY.MM.dd}"
hosts => ["your_ipaddress:9200"]
}
}
