一、Logstash簡介
Logstash是一個開源數據收集引擎,具有實時管道功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據標准化到你所選擇的目的地。
二、Logstash處理流程
Logstash處理流程大致可分為3個階段,Input---->Filter---->Output,用中文描述一下分別是,數據采集----->數據分析/解析---->數據輸出;具體的處理流程可以查看下圖,下面的一些函數和一些概念等后面我們在具體講講:
這里我們解釋一下codec的概念,Codec 是 logstash 從 1.3.0 版開始新引入的概念(Codec 來自 Coder/decoder 兩個單詞的首字母縮寫)。在此之前,logstash 只支持純文本形式輸入,然后以過濾器處理它。但現在,我們可以在輸入 期處理不同類型的數據,這全是因為有了 codec 設置。所以,這里需要糾正之前的一個概念。Logstash 不只是一個input | filter | output
的數據流,而是一個 input | decode | filter | encode | output
的數據流!codec 就是用來 decode、encode 事件的。
Logstash Event也需要介紹,類似Java的Object的對象,在這個過程我們可以對其進行一些賦值等操作;
三、Logstash架構介紹
上面介紹一下Logstash的數據流向,接下來我們介紹下Logstash的架構,當然我介紹的是6.X的架構,看下圖
由上圖得知,我們可以有多個輸入的文本,另外由Queue分發到不同的Pipline中,這里的Pipline中文的意思就是管道,在程序中我們可以理解為線程,可以有多個Pipline,並且每個Pipline之間是互不打擾的,另外每個Batcher,Filter和Output組成,Batcher是批量從Queue獲取數據的,這個值可以通過配置進行設置;
Pipline配置如下:
pipeline.workers : 8 Pipeline線程數,及filter_output的處理線程數,默認是CPU的核數,命令行參數為-w
pipeline.batch.size : 125 Batcher一次批量獲取的待處理文檔數,默認125(建議向es輸出的時候10-20Mb之間,可以計算一下文檔數),可以根據輸出進行調整,越大會占用越多的heap空間,可以通過jvm.options調整。命令行參數-b
pipeline.batch.delay :5 :Batcher的等待時長,單位為ms。命令行參數-u
接下來我們在了解下Queue設計思路,使用Queue以后我們首先要關注的是怎么確保數據是正常被消費掉了,這里到達QutPut以后會有發送一個ACK給Queue來告訴Queue這些Logstash Event已經處理完了,這樣就能確保一個數據正常被消費掉,這也是在消息隊列中我們常用的一種手段,接下來我們再聊聊Queue分類:
1.In Memory(在內存)
在內存中,固定大小,無法處理進程crash、機器宕機等情況,會導致數據丟失。
2.Persistent Queue In Disk (持久化到磁盤)
可處理進程crash情況,保證數據不丟失。保證數據至少消費一次;充當緩沖區,可代替kafka等消息隊列作用。
接下來我們來了解下持久隊列是怎么保證的?這里我們從數據到隊列以后的處理開始說起,首先隊列將數據備份到disk(磁盤),隊列返回響應給Input,最后數據經過OutPut以后返回ACK給隊列,當隊列接收到消息以后開始刪除disk中備份的數據,這樣就能保證數據持久性;
性能方面對比如下圖,基本性能是無差別的:
隊列主要配置如下:
queue.type: persisted 默認是memory
queue.max_bytes: 4gb 默認是1gb
四、Logstash配置介紹
我們會使用的配置文件在config目錄錄下面,logstash.yml和
官方,接下來我們主要介紹下參數的配置logstash.yml配置:jvm.options,另外6.0以上的版本會有
pipelines.yml,這個文件是為了在同一個進程中運行多個管道,具體可以參考下
node.name:節點名,默認是主機名;
path.data:持續化存儲數據的文件夾,默認是在Logstash home目錄下的data
path.config:設定Pipeline配置文件的目錄
path.log:設置Pipeline日志目錄
第三小節介紹的Queue和Pipeline都可以在該文件下設置,這里不做過多的介紹,另外更細節的一些參數配置大家可以參考官方;
jvm.options這個里面的配置大家可以自己根據自己機器的情況進行下JVM參數調優;
另外還有Pipeline配置文件,定義數據處理流程的文件,以.conf結尾
五、Pipeline配置
布爾類型 Boolean:isFailed => true
數值類型 Number:age => 33
字符串類型 String: name => “Hello”
數組類型:users => [{age=>11, name=>wtz}, {age => 22, name => myt}] 或者 path => [“/var/log/error,log”,”/var/log/warn.log”]
哈希類型:match => {“field” => “value1” “field”=>”value2”}
備注:#
在配置中可以引用Logstash Event的屬性(字段),主要由下面兩種方式:
1.直接引用字段值 – 直接使用[],如果是多層直接嵌套
2.在字符串中以 sprintf 方式引用 – 使用%{}來實現
配置文件支持條件判斷語法:
表達式操作如下:
1.比較:== != < > <= >=
2.正則是否匹配:=~ =
3.包含(字符串或者數組):in, not in
4.布爾操作符:and or nand xor !
5.分組操作符(條件非常復雜的時候,其實就相當於括號):()
六、插件詳解
第二部分的時候我們介紹了Logstash的數據處理流程,涉及到一些插件流程,接下來我們介紹下插件的配置,也是分成3部分介紹:
input插件介紹:
1.stdin
最簡單的輸入,從標准輸入讀取數據
2.file
從文件讀取數據,參數介紹如下:
3.kafka
這個是我們使用的配置,供大家參考下,另外大家可以參考官方文檔

input { kafka { bootstrap_servers => "服務地址" group_id => "消費組id" topics => ["訂閱的主題列表","訂閱的主題列表"] codec => "json" auto_offset_reset => "Kafka偏移量,超出設置為最早偏移量" consumer_threads => "線程數" } }
另外大家還可以看下官方文檔選擇自己合適的使用;
Filter插件介紹
1.grok
解析和構造任意文本,Grok是目前Logstash中解析非結構化日志數據到結構化和可查詢數據的最佳方式,使用內置的120種模式;
另外可以閱讀下這篇文章你真的理解grok嗎
2.mutate
對事件字段執行一般的轉換,你可以重命名、刪除、替換和修改事件中的字段。
3.drop
完全刪除事件,例如調試事件。
4.clone
復制事件,可能添加或刪除字段。
5.geoip
添加關於IP地址地理位置的信息
更多的大家可以閱讀官方文檔
OutPut插件介紹
最常用的就是輸出到es,配置如下

elasticsearch { hosts => "地址" index => "索引" document_type => "文檔類型" template_overwrite => 是否重寫 }
Codec插件介紹
1.plain 讀取原始內容
2.rubydebug將Logstash Event按照ruby格式輸出,方便調試
3.line處理帶有換行符的內容
4.json處理json格式的內容
5.multiline處理多行數據的內容
七、最后說點什么
參考與學習的來源為慕課網和官方文檔,歡迎大家加入我QQ群:438836709
歡迎大家關注我公眾號:
另外Oath2.0的文章和demo正在努力寫!!沒事大家動動小手點個贊!!謝謝!!