Elastic Stack初篇-Logstash


 一、Logstash簡介

     Logstash是一個開源數據收集引擎,具有實時管道功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據標准化到你所選擇的目的地。

     

二、Logstash處理流程

   Logstash處理流程大致可分為3個階段,Input---->Filter---->Output,用中文描述一下分別是,數據采集----->數據分析/解析---->數據輸出;具體的處理流程可以查看下圖,下面的一些函數和一些概念等后面我們在具體講講:

   

  這里我們解釋一下codec的概念,Codec 是 logstash 從 1.3.0 版開始新引入的概念(Codec 來自 Coder/decoder 兩個單詞的首字母縮寫)。在此之前,logstash 只支持純文本形式輸入,然后以過濾器處理它。但現在,我們可以在輸入 期處理不同類型的數據,這全是因為有了 codec 設置。所以,這里需要糾正之前的一個概念。Logstash 不只是一個input | filter | output 的數據流,而是一個 input | decode | filter | encode | output 的數據流!codec 就是用來 decode、encode 事件的。

   Logstash Event也需要介紹,類似Java的Object的對象,在這個過程我們可以對其進行一些賦值等操作;

三、Logstash架構介紹

 上面介紹一下Logstash的數據流向,接下來我們介紹下Logstash的架構,當然我介紹的是6.X的架構,看下圖

 

 

  由上圖得知,我們可以有多個輸入的文本,另外由Queue分發到不同的Pipline中,這里的Pipline中文的意思就是管道,在程序中我們可以理解為線程,可以有多個Pipline,並且每個Pipline之間是互不打擾的,另外每個Batcher,Filter和Output組成,Batcher是批量從Queue獲取數據的,這個值可以通過配置進行設置;

  Pipline配置如下:

  pipeline.workers :  8  Pipeline線程數,及filter_output的處理線程數,默認是CPU的核數,命令行參數為-w

  pipeline.batch.size : 125  Batcher一次批量獲取的待處理文檔數,默認125(建議向es輸出的時候10-20Mb之間,可以計算一下文檔數),可以根據輸出進行調整,越大會占用越多的heap空間,可以通過jvm.options調整。命令行參數-b

  pipeline.batch.delay :5  :Batcher的等待時長,單位為ms。命令行參數-u

  接下來我們在了解下Queue設計思路,使用Queue以后我們首先要關注的是怎么確保數據是正常被消費掉了,這里到達QutPut以后會有發送一個ACK給Queue來告訴Queue這些Logstash Event已經處理完了,這樣就能確保一個數據正常被消費掉,這也是在消息隊列中我們常用的一種手段,接下來我們再聊聊Queue分類:

  1.In Memory(在內存)

  在內存中,固定大小,無法處理進程crash、機器宕機等情況,會導致數據丟失。

  2.Persistent Queue In Disk (持久化到磁盤)

  可處理進程crash情況,保證數據不丟失。保證數據至少消費一次;充當緩沖區,可代替kafka等消息隊列作用。

  接下來我們來了解下持久隊列是怎么保證的?這里我們從數據到隊列以后的處理開始說起,首先隊列將數據備份到disk(磁盤),隊列返回響應給Input,最后數據經過OutPut以后返回ACK給隊列,當隊列接收到消息以后開始刪除disk中備份的數據,這樣就能保證數據持久性;

  性能方面對比如下圖,基本性能是無差別的:

  

  隊列主要配置如下:

  queue.type: persisted 默認是memory

  queue.max_bytes: 4gb 默認是1gb

四、Logstash配置介紹

   我們會使用的配置文件在config目錄錄下面,logstash.yml和jvm.options,另外6.0以上的版本會有pipelines.yml,這個文件是為了在同一個進程中運行多個管道,具體可以參考下官方,接下來我們主要介紹下參數的配置logstash.yml配置:

   node.name:節點名,默認是主機名;

   path.data:持續化存儲數據的文件夾,默認是在Logstash home目錄下的data

   path.config:設定Pipeline配置文件的目錄

   path.log:設置Pipeline日志目錄

   第三小節介紹的Queue和Pipeline都可以在該文件下設置,這里不做過多的介紹,另外更細節的一些參數配置大家可以參考官方

   jvm.options這個里面的配置大家可以自己根據自己機器的情況進行下JVM參數調優;

   另外還有Pipeline配置文件,定義數據處理流程的文件,以.conf結尾

五、Pipeline配置

  布爾類型 Boolean:isFailed => true

  數值類型 Number:age => 33

  字符串類型 String: name => “Hello”

  數組類型:users => [{age=>11, name=>wtz}, {age => 22, name => myt}] 或者 path => [“/var/log/error,log”,”/var/log/warn.log”]

  哈希類型:match => {“field” => “value1”  “field”=>”value2”}

  備注:#

  在配置中可以引用Logstash Event的屬性(字段),主要由下面兩種方式:

  1.直接引用字段值 – 直接使用[],如果是多層直接嵌套

  

  2.在字符串中以 sprintf 方式引用 – 使用%{}來實現

  

 配置文件支持條件判斷語法:

  

 表達式操作如下:

 1.比較:== != < > <= >=

 2.正則是否匹配:=~ =

 3.包含(字符串或者數組):in, not in

 4.布爾操作符:and or nand xor !

 5.分組操作符(條件非常復雜的時候,其實就相當於括號):()

六、插件詳解

 第二部分的時候我們介紹了Logstash的數據處理流程,涉及到一些插件流程,接下來我們介紹下插件的配置,也是分成3部分介紹:

 input插件介紹:

 1.stdin

 最簡單的輸入,從標准輸入讀取數據

 2.file

 從文件讀取數據,參數介紹如下:

 3.kafka

 這個是我們使用的配置,供大家參考下,另外大家可以參考官方文檔

input {
    kafka {
        bootstrap_servers => "服務地址"
        group_id => "消費組id"
        topics => ["訂閱的主題列表","訂閱的主題列表"]
        codec => "json"
        auto_offset_reset => "Kafka偏移量,超出設置為最早偏移量"
        consumer_threads => "線程數"
    }
}
View Code

 另外大家還可以看下官方文檔選擇自己合適的使用;

 Filter插件介紹

 1.grok

 解析和構造任意文本,Grok是目前Logstash中解析非結構化日志數據到結構化和可查詢數據的最佳方式,使用內置的120種模式;

 另外可以閱讀下這篇文章你真的理解grok嗎

 2.mutate

 對事件字段執行一般的轉換,你可以重命名、刪除、替換和修改事件中的字段。

 3.drop

 完全刪除事件,例如調試事件。

 4.clone

 復制事件,可能添加或刪除字段。

 5.geoip

 添加關於IP地址地理位置的信息

 更多的大家可以閱讀官方文檔

 OutPut插件介紹

 最常用的就是輸出到es,配置如下

elasticsearch {
      hosts => "地址"
      index => "索引"
      document_type => "文檔類型"
      template_overwrite => 是否重寫
}
View Code

 Codec插件介紹

 1.plain 讀取原始內容

 2.rubydebug將Logstash Event按照ruby格式輸出,方便調試

 3.line處理帶有換行符的內容

 4.json處理json格式的內容

 5.multiline處理多行數據的內容

七、最后說點什么

 參考與學習的來源為慕課網和官方文檔,歡迎大家加入我QQ群:438836709

 歡迎大家關注我公眾號:

 

 另外Oath2.0的文章和demo正在努力寫!!沒事大家動動小手點個贊!!謝謝!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM