Logstash:處理多個input


我們知道Logstash的架構如下:

  它的整個pipleline分為三個部分:
    input插件:提取數據。 這可以來自日志文件,TCP或UDP偵聽器,若干協議特定插件(如syslog或IRC)之一,甚至是排隊系統(如Redis,AQMP或Kafka)。 此階段使用圍繞事件來源的元數據標記傳入事件。
    filter 插件:插件轉換並豐富數據
    output插件: 將已處理的事件加載到其他內容中,例如ElasticSearch或其他文檔數據庫,或排隊系統,如Redis,AQMP或Kafka。 它還可以配置為與API通信。 也可以將像PagerDuty這樣的東西連接到Logstash輸出。

這里的input可以支持多個input,同時多個worker可以處理filter及output:

  在今天的介紹中,我們來介紹一下如何使用多個input。

應用文件

為了說明問題的方便,我把所需要用到的文件都傳到github地址https://github.com/liu-xiao-guo/logstash_multi-input。我們可以通過如下的方式來下載這些文件:

git clone https://github.com/liu-xiao-guo/logstash_multi-input

Logstash配置文件

Logstash的配置文件如下:

multi-input.conf

input {
  file {
    path => "/Users/liuxg/data/multi-input/apache.log"
      start_position => "beginning"
    sincedb_path => "/dev/null"
    # ignore_older => 100000
    type => "apache"
  }
}
 
input {
  file {
    path => "/Users/liuxg/data/multi-input/apache-daily-access.log"
      start_position => "beginning"
    sincedb_path => "/dev/null"
    type => "daily"
  }
}
 
filter {
      grok {
        match => {
              "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
        }
      }
 
    if[type] == "apache" {
        mutate {
              add_tag => ["apache"]
          }
    }
 
    if [type] == "daily" {
        mutate {
            add_tag => ["daily"]
        }
    } 
}
 
 
output {
    stdout {
        codec => rubydebug
    }
 
    if "apache" in [tags] {
          elasticsearch {
            index => "apache_log"
            template => "/Users/liuxg/data/apache_template.json"
            template_name => "apache_elastic_example"
            template_overwrite => true
      }    
    }
 
    if "daily" in [tags] {
          elasticsearch {
            index => "apache_daily"
            template => "/Users/liuxg/data/apache_template.json"
            template_name => "apache_elastic_example"
            template_overwrite => true
      }    
    }    
}

為了說明問題的方便,我們使用了兩個input。它們分別對應不同的log文件。對於這兩個input,我們也使用了不同的type來表示:apache和daily。盡管它們的格式是一樣的,它們共同使用同樣的一個grok filter,但是我們還是想分別對它們進行處理。為此,我們添加了一個tag。我們也可以添加一個field來進行區別。在output的部分,我們根據在filter部分設置的tag來對它們輸出到不同的index里。

運行Logstash

$ pwd
/Users/liuxg/elastic/logstash-7.3.0
bogon:logstash-7.3.0 liuxg$ sudo ./bin/logstash -f ~/data/multi-input/multi-input.conf

當你們運行這個例子的時候,你們需要根據自己存放multi-input.conf文件的位置改變而改變上面的命令。

運行的結果如下:

  根據顯示的結果可以看出來daily的事件最早被處理及輸出。接着apache的數據才開始處理。在實際的應用中,我們可能有不同的數據源,比如來自其它beats的監聽某個端口的數據。

我們可以在Kibana中看到我們最終的index數據:

轉載自:https://blog.csdn.net/UbuntuTouch/java/article/details/100980709


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM