前言
在平時工作中,我們需要對相關日志進行分析,隨着平台的允許,日志會越來越大,不便於分析,此時我們需要將日志寫入es,在這個過程中logstash起到中間轉發的作用,類似於ETL工具。
1、搭建EL環境(此處沒有使用Kibana)
(1)、安裝es(5.6.16)
下載地址:https://elasticsearch.cn/download/
安裝步驟:https://www.cnblogs.com/cq-yangzhou/p/9310431.html
(2)、安裝logstash
下載地址:https://elasticsearch.cn/download/
安裝步驟:解壓即可。
(3)、安裝IK分詞器
下載地址:https://github.com/medcl/elasticsearch-analysis-ik/releases
安裝步驟:解壓,將里面的內容拷貝到es的plugins/ik(ik目錄自己創建)目錄下面,重啟es即可
2、搭建springboot+logstash環境
(1)、引入logstash的mven依賴
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.1</version>
</dependency>
(2)、編寫logback-spring.xml放在resources目錄下面
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<!--日志寫入logstash-->
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:4567</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<!--監控指定的日志類-->
<logger name="com.example.demo.logutil.LogStashUtil" level="INFO">
<appender-ref ref="logstash"/>
</logger>
</configuration>
(3)、編寫日志類LogStashUtil
@Slf4j public class LogStashUtil { public static void sendMessage(String username, String type, String content, Date createTime,String parameters){ JSONObject jsonObject = new JSONObject(); jsonObject.putOpt("username",username); jsonObject.putOpt("type",type); jsonObject.putOpt("content",content); jsonObject.putOpt("parameters",parameters); jsonObject.putOpt("createTime", createTime); log.info(jsonObject.toString()); } }
(4)、編寫logstash對應的日志收集配置文件
input { tcp { mode => "server" host => "0.0.0.0" port => 4567 codec => json{ charset=>"UTF-8" } } } filter { json { source => "message" #移除的字段,不會存入es remove_field => ["message","port","thread_name","logger_name","@version","level_value","tags"] } date { match => [ "createTime", "UNIX_MS" ] target => "@timestamp" } ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } date { match => [ "createTime", "UNIX_MS" ] target => "createTime" } ruby { code => "event.set('createTime', event.get('createTime').time.localtime + 8*60*60)" #時間加8個小時 } } output { elasticsearch { hosts => "localhost:9200" index => "springboot-logstash-%{+YYYY.MM}" document_type => access
#關閉模板管理,使用es通過API創建的模板
manage_template => false
#es中模板的名稱
template_name => "message"
} }
補充:模板編寫
{ "template": "springboot-logstash_*", "settings": { "index.number_of_shards": 5, "number_of_replicas": 0 }, #指定ik分詞 "analysis":{ "analyzer":{ "ik":{ "tokenizer":"ik_max_word" } } }, "mappings": { "_default_": { "_all": { "enabled": true, "omit_norms": true }, ##指定字段type需要進行ik分詞 ik_max_word 最大粒度分詞,ik_smart粗粒度分詞 "dynamic_templates": [{ "message_field": { "match": "type", "match_mapping_type": "string", "mapping": { "type": "string", "index": "analyzed", "analyzer":"ik_max_word" } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "string", "index": "not_analyzed", "doc_values": true } } }], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "string", "index": "not_analyzed" } /*, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "location": { "type": "geo_point" }, "latitude": { "type": "float" }, "longitude": { "type": "float" } } }*/ } } } }
模板創建好之后,調用es的創建模板接口,傳入參數進行創建
#創建模板(覆蓋模板) PUT _template/template_name #查看模板 GET _template/template_name #刪除模板 DELETE _template/template_name
栗子:

(5)、啟動es和logstash
1、啟動es 進入es的bin目錄 elasticsearch -d p pid
(-p:在文件中記錄進程id)
2、啟動logstash logstash -f message.conf(指定對應的日志配置文件路徑) -d(后台運行)
(6)、單元測試,調用日志收集類LogStashUtil
@Test public void contextLoads() { LogStashUtil.sendMessage( "admin","添加年度管理計划","測試",new Date(),"測試"); }
(7)、查看es中的結果

