需求:收集 ubuntu 系統日志,發送給 logstash,再由 logstash 把數據傳遞給 elasticsearch,最后通過 kibana 展示日志數據。
Filebeat System Module
Filebeat Modules 可以簡化常見日志格式的收集、解析和可視化。一個典型的模塊(例如,對於 Nginx 日志)由一個或多個日志文件(fileset)組成(對於 Nginx 來說,默認是 access.log 和 error.log)。這里我們可以使用 Filebeat 的 System Module 完成 ubuntu 的系統日志。
下面介紹配置 System Module 的步驟(假如你已經安裝好了 Filebeat)。
啟用 System Module
Filebeat 支持的模塊默認都是未啟用的,我們可以通過下面的方式啟用模塊。找到 filebeat 程序,執行 moudles enable 命令:
$ sudo ./filebeat modules enable system
上面的命令啟用了 system 模塊,用下面的命令可以查看當前已經啟用的模塊有哪些:
$ sudo ./filebeat modules list
把數據發送給 logstash
配置 Filebeat 將日志行發送到 Logstash。要做到這一點,在配置文件 filebeat.yml 中禁用 Elasticsearch 輸出,並啟用 Logstash 輸出:
#output.elasticsearch: #hosts: ["xxx.xxx.xxx.xxx:9200"] output.logstash: hosts: ["xxx.xxx.xxx.xxx:5044"]
重啟 filebeat 服務
$ sudo systemctl restart filebeat.service
配置 Logstash 處理數據
要讓 logstash 接受 Filebeat System Module 發送來的數據還是有些難度的,至少我們需要一個看上去有點復雜的配置:
input { beats { port => 5064 host => "0.0.0.0" } } filter { if [fileset][module] == "system" { if [fileset][name] == "auth" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] } pattern_definitions => { "GREEDYMULTILINE"=> "(.|\n)*" } remove_field => "message" } date { match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } geoip { source => "[system][auth][ssh][ip]" target => "[system][auth][ssh][geoip]" } } else if [fileset][name] == "syslog" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" } remove_field => "message" } date { match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } } output { elasticsearch { hosts => xxx.xxx.xxx.xxx manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } }
處理時區問題
看到這段配置我多么希望它能夠直接工作啊!不幸的是它並不能很好的工作,至少在我的 ubuntu 18.04 上不行。問題的核心是無論 auth.log 還是 syslog,記錄的都是本地時區的區時:
而上面的配置中把這些時間都當成 UTC 時間來處理了。搞清楚了原因,糾正起來就很容易了,在 date 插件中添加本地的時區信息就可以了。比如筆者所在時區為東八區,那么就分別在兩個 date 的配置中添加下面的信息:
timezone => "Asia/Chongqing"
讓獨立的 pipeline 處理該數據流
下面創建一個新的目錄 /etc/logstash/myconf.d,並在 /etc/logstash/myconf.d 目錄下創建 Logstash 配置文件 krtest.conf。然后在 /etc/logstash/pipelines.yml 文件中添加新的 pipeline 配置:
- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: krtest
path.config: "/etc/logstash/myconf.d/krtest.conf"
其中 pipeline.id 為 main 的管道是默認的配置,我們新添加了 id 為 krtest 的管道並指定了對應的配置文件路徑。把上面的配置寫入到 /etc/logstash/myconf.d/krtest.conf 文件中。然后重啟 logstash 服務:
$ sudo systemctl restart logstash.service
在 Kibana 中查看日志
最后在 kibana 中添加 filebeat 開頭的 index pattern,就可以通過圖形界面查看 ubuntu 的系統日志了:
參考:
Filebeat Modules
System module
Working with Filebeat Modules