原創/朱季謙
最近在做一個將分布式系統的日志數據通過logstash傳到kafka的功能,做完之后決定業余搭一個ELK日志分析系統,將logstash采集到的日志傳給Elasticsearch。經過一番搗鼓,也把這個過程給走通了,於是寫了這篇總結,可按照以下步驟搭建logstash采集spring日志數據並傳輸給Elasticsearch。
首先,logstash是一個開源的數據收集引擎,能夠同時從多個來源采集到數據,並將數據轉發到想存儲的“庫”中。例如,logstash可以采集數據轉發存儲到
Elasticsearch,也可以轉發到kafka等消息中間件里。logstash擁有強大的插件生態,包括基本的http、tcp、udp、file以及kafa、redis等等。這些插件,在logstash5.x版本以上的,是已經自帶了,不需要額外安裝。
在基於ssm的開發過程中,運行Spring項目時,可以在控制台上看到log的日志打印信息,我們可以把這些日志信息的ERROR日志進行監聽和轉發存儲。
如何實現logstash來監聽Spring項目的日志並將ERROR數據進行轉發存儲呢?
部署架構圖:

可以按照以下流程來實現:
1.下載logstash。
根據以下地址來網盤獲取logstash-5.5.2版本的:
鏈接:https://pan.baidu.com/s/1h7xo65P7_O76Azt0-I-2-A
提取碼:95vg
2.安裝logstash
直接把壓縮包壓縮到本地某個盤里就可以了,不需要做額外操作了,5.x以上版本的logstash是不需要安裝其他插件,已經自帶大部分插件。
3.驗證是否安裝成功
運行cmd,進入logstash的bin目錄下,運行指令:logstash -e 'input{stdin{}} output{stdout{}}'
運行成功的截圖如下,即為安裝並啟動成功:

啟動以后,在光標處輸入:hellowrold

正常情況下,會顯示以下信息,證明logstash可以正常使用了。
在這個過程里,涉及到幾個概念,logstash是一個管道,里面有兩個input和output的必選元素,即輸入與輸出,之間還可以有一個可選的過濾器filter過濾器。input插件從源頭獲取到數據,過濾器會根據條件來進行修改,最后通過ouput插件將數據傳輸,可輸出給Elasticsearch、kafka、file
等。
處理過程模型圖如下:

Logstash 提供了一個 shell 腳本叫 logstash,
支持以下運行參數:
執行命令: -e 執行-e后面的參數
:logstash -e 'input{stdin{}} output{stdout{}}'
執行文件: --config 或 -f 執行-f后面的conf文件:
logstash -f logstash.conf
輸入插件:input{ … }
過濾插件:filter{ … }
輸出插件:output{ … }
測試配置文件是否正確,然后退出:-t
在這篇文章里,主要用到以上這些命令,其余讀者若感興趣可以自行去研究探索。
4.配置一個文件**.conf
可以在bin目錄或者config目錄或者其他目錄下,新建一個**.conf文件,我選擇的是bin目錄下,新建文件logstash.conf,截圖如下:

在logstash.conf文件里配置信息:
1 input { stdin { } }#該行可有可無,寫來打印測試而已 2 input { 3 #開啟tcp插件的監聽 4 tcp { 5 #這個需要配置成本機IP,不然logstash無法啟動 6 host => "127.0.0.1" 7 #端口號 8 port => 9600 9 #將日志以json格式輸入 10 codec => json_lines 11 } 12 } 13 14 output { 15 #輸出打印 16 stdout { codec => rubydebug } 17 }
配置好,就可以先啟動進行監聽了,啟動命令:先cd進到存放logstash.conf的目錄下,我的目錄在bin里,所以進入的是bin目錄,執行:logstash -f logstash.conf。
5.在spring進行logstash配置的maven依賴引入
我在項目里用到的開源日志組件是
logback,
它是log4j的改良,主要分為以下三個模塊:
logback-classic:log4j的一個改良版本,完整實現了slf4j API,可以方便更換成其它日志系統如log4j或JDK14 Logging。
logback-access:訪問模塊與Servlet容器集成提供通過Http來訪問日志的功能。
logback-core:是其它兩個模塊的基礎模塊。
logback需要在maven里引用到的依賴:
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>net.logstash.log4j</groupId> <artifactId>jsonevent-layout</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>5.0</version> </dependency>
如果引用到的ch.qos.logback依賴版本太低的話,可能會出現以下錯誤:java.lang.NoSuchMethodError: ch.qos.logback.core.util.Loader.getResourceOccurrenceCount(Ljava/lang/String;Ljava/lang/ClassLoader;)Ljava/util/Set;可以根據項目需求來選擇合適的版本,經過測試,以上的1.2.3版本是可以符合要求的。配置完成依賴后,就可以開始進行下一步配置。
6.在spring的logback.xml里進行logstash配置(省略logback其余無關該流程的部分)
1 <!--開啟tcp格式的logstash傳輸,通過TCP協議連接Logstash--> 2 <appender name="STASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> 3 <destination>127.0.0.1:9600</destination> 4 5 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> 6 <jsonFactoryDecorator class="net.logstash.logback.decorate.CharacterEscapesJsonFactoryDecorator"> 7 <escape> 8 <targetCharacterCode>10</targetCharacterCode> 9 <escapeSequence>\u2028</escapeSequence> 10 </escape> 11 </jsonFactoryDecorator> 12 <providers> 13 <pattern> 14 <pattern> 15 { 16 "timestamp":"%date{ISO8601}", 17 "user":"test", 18 "message":"[%d{yyyy-MM-dd HH:mm:ss.SSS}][%p][%t][%l{80}|%L]%m"}%n 19 } 20 </pattern> 21 </pattern> 22 </providers> 23 </encoder> 24 <keepAliveDuration>5 minutes</keepAliveDuration> 25 </appender> 26 27 <root level="INFO"> 28 <appender-ref ref="STASH"/> 29 </root>
配置說明:
encoder:配置的規范;
LoggingEventCompositeJsonEncoder:json格式的編碼器,即將日志數據轉換成json格式;
jsonFactoryDecorator:解決中文轉碼的問題;
providers:json格式提供者,對json進行一個定制化設置,比如,timestamp,message,thread_name等,其他的自定義的字段的值可以通過MDC設置進來,格式就是%date{xx},
注意:按照上面的設置,logstash才可以正常接收到日志數據,否則是無法接收到的。
destination定義的ip與端口與logstash里的logstash.conf需一直,logstash.conf里的tcp會一直監聽這個ip的端口:

配置完成后,啟動spring項目,這時原來監聽tcp的logstash就可以實時監聽接收到了數據,logstash的控制台顯示打印如下:

若要監聽到的是ERROR級別的日志,在logback.xml里的logstash配置里的appender里添加一行以下代碼即可:
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter>
在日志級別修改為以下即可:
1 <root level="INFO"> 2 <appender-ref ref="STASH"/> 3 <appender-ref ref="ERROR"/> 4 </root>
7.到這一步,就完成了通過logstash收集spring的logback日志的功能,在這個基礎上,可以再進一步擴展,擴展將logstash采集到的數據輸出到Elasticsearch。
1 input { stdin { } }#該行可有可無 2 input { 3 #開啟tcp模式的監聽 4 tcp { 5 #這個需要配置成本機IP,不然logstash無法啟動 6 host => "127.0.0.1" 7 #端口號 8 port => 9600 9 #將日志以json格式輸入 10 codec => json_lines 11 } 12 } 13 14 output { 15 #輸出打印 16 stdout { codec => rubydebug } 17 elasticsearch { hosts => ["127.0.0.1:9200"] } 18 }
打開已經本地安裝的Elasticsearch:http://127.0.0.1:9100/,可以看到,ES可以接受到logstash接收到的數據了: