SpringBoot+kafka+ELK分布式日志收集


一、背景

隨着業務復雜度的提升以及微服務的興起,傳統單一項目會被按照業務規則進行垂直拆分,另外為了防止單點故障我們也會將重要的服務模塊進行集群部署,通過負載均衡進行服務的調用。那么隨着節點的增多,各個服務的日志也會散落在各個服務器上。這對於我們進行日志分析帶來了巨大的挑戰,總不能一台一台的登錄去下載日志吧。那么我們需要一種收集日志的工具將散落在各個服務器節點上的日志收集起來,進行統一的查詢及管理統計。那么ELK就可以做到這一點。

ELK是ElasticSearch+Logstash+Kibana的簡稱,在這里我分別對如上幾個組件做個簡單的介紹:

1.1、ElasticSearch(簡稱ES)

Elasticsearch是一個高度可擴展的開源全文搜索和分析引擎。它允許您快速、實時地存儲、搜索和分析大量數據。它通常用作底層引擎/技術,為具有復雜搜索特性和需求的應用程序提供動力。我們可以借助如ElasticSearch完成諸如搜索,日志收集,反向搜索,智能分析等功能。ES設計的目標:

  • 快速實時搜索

Elasticsearch是一個實時搜索平台。這意味着,從索引文檔到可搜索文檔,存在輕微的延遲(通常為一秒)。

  • 集群

集群是一個或多個節點(服務器)的集合,這些節點(服務器)一起保存整個數據,並提供跨所有節點的聯合索引和搜索功能。集群由一個惟一的名稱來標識,默認情況下該名稱為“elasticsearch”。這個名稱很重要,因為節點只能是集群的一部分,如果節點被設置為通過其名稱加入集群的話。確保不要在不同的環境中重用相同的集群名稱,否則可能會導致節點加入錯誤的集群。例如,您可以使用logging-dev、logging-test和logging-prod開發、測試和生產集群。

  • 節點

節點是單個服務器,它是集群的一部分,它用來存儲數據,並參與集群的索引和搜索功能。與集群一樣,節點的名稱默認為在啟動時分配給節點的隨機惟一標識符(UUID)。如果不需要默認值,可以定義任何節點名稱。這個名稱對於管理非常重要,因為您想要確定網絡中的哪些服務器對應於Elasticsearch集群中的哪些節點。

  • 索引

索引是具有類似特征的文檔的集合。例如,您可以有一個客戶數據索引、另一個產品目錄索引和另一個訂單數據索引。索引由一個名稱標識(必須是小寫的),該名稱用於在對其中的文檔執行索引、搜索、更新和刪除操作時引用索引。在單個集群中,可以定義任意數量的索引。

  • 文檔

文檔是可以建立索引的基本信息單元。例如,可以為單個客戶提供一個文檔,為單個產品提供一個文檔,為單個訂單提供另一個文檔。這個文檔用JSON (JavaScript對象符號)表示。在索引中,可以存儲任意數量的文檔。請注意,盡管文檔在物理上駐留在索引中,但實際上文檔必須被索引/分配到索引中的類型中。

1.2、Logstash

Logstash是一個開源數據收集引擎,具有實時流水線功能。Logstash可以動態地將來自不同數據源的數據統一起來,並將數據規范化后(通過Filter過濾)傳輸到您選擇的目標。
basic_logstash_pipeline

在這里inputs代表數據的輸入通道,大家可以簡單理解為來源。常見的可以從kafka,FileBeat, DB等獲取日志數據,這些數據經過fliter過濾后(比如說:日志過濾,json格式解析等)通過outputs傳輸到指定的位置進行存儲(Elasticsearch,Mogodb,Redis等)

簡單的實例:

    cd logstash-6.4.1
    bin/logstash -e 'input { stdin { } } output { stdout {} }'

1.3、Kibana

kibana是用於Elasticsearch檢索數據的開源分析和可視化平台。我們可以使用Kibana搜索、查看或者與存儲在Elasticsearch索引中的數據交互。同時也可以輕松地執行高級數據分析並在各種圖表、表和映射中可視化數據。基於瀏覽器的Kibana界面使您能夠快速創建和共享動態儀表板,實時顯示對Elasticsearch查詢的更改。

1.4、處理方案

_

用戶通過java應用程序的Slf4j寫入日志,SpringBoot默認使用的是logback。我們通過實現自定義的Appender將日志寫入kafka,同時logstash通過input插件操作kafka訂閱其對應的主題。當有日志輸出后被kafka的客戶端logstash所收集,經過相關過濾操作后將日志寫入Elasticsearch,此時用戶可以通過kibana獲取elasticsearch中的日志信息

二、SpringBoot中的配置

在SpringBoot當中,我們可以通過logback-srping.xml來擴展logback的配置。不過我們在此之前應當先添加logback對kafka的依賴,代碼如下:

    compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1'

添加好依賴之后我們需要在類路徑下創建logback-spring.xml的配置文件並做如下配置(添加kafka的Appender):

    <configuration>
        <!-- springProfile用於指定當前激活的環境,如果spring.profile.active的值是哪個,就會激活對應節點下的配置 -->
        <springProfile name="default">
            <!-- configuration to be enabled when the "staging" profile is active -->
            <springProperty scope="context" name="module" source="spring.application.name"
            defaultValue="undefinded"/>
            <!-- 該節點會讀取Environment中配置的值,在這里我們讀取application.yml中的值 -->
            <springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"
                            defaultValue="localhost:9092"/>
            <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
                <!-- encoders are assigned the type
                     ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
                <encoder>
                    <pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern>
                </encoder>
            </appender>
            <!-- kafka的appender配置 -->
            <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
                <encoder>
                    <pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern>
                </encoder>
                <topic>logger-channel</topic>
                <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
                <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
    
                <!-- Optional parameter to use a fixed partition -->
                <!-- <partition>0</partition> -->
    
                <!-- Optional parameter to include log timestamps into the kafka message -->
                <!-- <appendTimestamp>true</appendTimestamp> -->
    
                <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
                <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
                <!-- bootstrap.servers is the only mandatory producerConfig -->
                <producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig>
    
                <!-- 如果kafka不可用則輸出到控制台 -->
                <appender-ref ref="STDOUT"/>
    
            </appender>
            <!-- 指定項目中的logger -->
            <logger name="org.springframework.test" level="INFO" >
                <appender-ref ref="kafka" />
            </logger>
            <root level="info">
                <appender-ref ref="STDOUT" />
            </root>
        </springProfile>
    </configuration>

在這里面我們主要注意以下幾點:

  • 日志輸出的格式是為模塊名 | 時間 | 日志級別 | 類的全名 | 日志內容
  • SpringProfile節點用於指定當前激活的環境,如果spring.profile.active的值是哪個,就會激活對應節點下的配置
  • springProperty可以讀取Environment中的值

三、ELK搭建過程

3.1、檢查環境

ElasticSearch需要jdk8,官方建議我們使用JDK的版本為1.8.0_131,原文如下:

Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131

檢查完畢后,我們可以分別在官網下載對應的組件

3.2、啟動zookeeper

首先進入啟動zookeeper的根目錄下,將conf目錄下的zoo_sample.cfg文件拷貝一份重新命名為zoo.cfg

    mv zoo_sample.cfg zoo.cfg

配置文件如下:

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=../zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    

緊接着我們進入bin目錄啟動zookeeper:

    ./zkServer.sh start

3.3、啟動kafka

在kafka根目錄下運行如下命令啟動kafka:

    ./bin/kafka-server-start.sh config/server.properties 

啟動完畢后我們需要創建一個logger-channel主題:

    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel

3.4、配置並啟動logstash

進入logstash跟目錄下的config目錄,我們將logstash-sample.conf的配置文件拷貝到根目錄下重新命名為core.conf,然后我們打開配置文件進行編輯:

    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
      kafka {
        id => "my_plugin_id"
        bootstrap_servers => "localhost:9092"
        topics => ["logger-channel"]
        auto_offset_reset => "latest" 
      }
    }
    filter {
    
        grok {
          patterns_dir => ["./patterns"]
        	match => { "message" => "%{WORD:module} \| %{LOGBACKTIME:timestamp} \| %{LOGLEVEL:level} \| %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
      	}
        
        
    }
    output {
      stdout { codec => rubydebug }
      elasticsearch {
       	hosts =>["localhost:9200"]
      }
    }
    ```

我們分別配置logstash的input,filter和output(懂ruby的童鞋們肯定對語法結構不陌生吧):

- 在input當中我們指定日志來源為kafka,具體含義可以參考官網:[kafka-input-plugin](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html)
- 在filter中我們配置grok插件,該插件可以利用正則分析日志內容,其中patterns_dir屬性用於指定自定義的分析規則,我們可以在該文件下建立文件配置驗證的正則規則。舉例子說明:55.3.244.1 GET /index.html 15824 0.043的 日志內容經過如下配置解析:
```ruby
    grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }

解析過后會變成:

    client: 55.3.244.1
    method: GET
    request: /index.html
    bytes: 15824
    duration: 0.043

這些屬性都會在elasticsearch中存為對應的屬性字段。更詳細的介紹請參考官網:grok ,當然該插件已經幫我們定義好了好多種核心規則,我們可以在這里查看所有的規則。

    # yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527
    LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})
    ```



編輯好配置后我們運行如下命令啟動logstash:
```bash
    bin/logstash -f first-pipeline.conf --config.reload.automatic

該命令會實時更新配置文件而不需啟動

3.5、啟動ElasticSearch

啟動ElasticSearch很簡單,我們可以運行如下命令:

    ./bin/elasticsearch

我們可以發送get請求來判斷啟動成功:

     GET http://localhost:9200

我們可以得到類似於如下的結果:

    {
      "name" : "Cp8oag6",
      "cluster_name" : "elasticsearch",
      "cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA",
      "version" : {
        "number" : "6.4.0",
        "build_flavor" : "default",
        "build_type" : "zip",
        "build_hash" : "f27399d",
        "build_date" : "2016-03-30T09:51:41.449Z",
        "build_snapshot" : false,
        "lucene_version" : "7.4.0",
        "minimum_wire_compatibility_version" : "1.2.3",
        "minimum_index_compatibility_version" : "1.2.3"
      },
      "tagline" : "You Know, for Search"
    }

3.5.1 配置IK分詞器(可選)

我們可以在github上下載elasticsearch的IK分詞器,地址如下:ik分詞器,然后把它解壓至your-es-root/plugins/ik的目錄下,我們可以在{conf}/analysis-ik/config/IKAnalyzer.cfg.xmlor {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml 里配置自定義分詞器:

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
    <properties>
    	<comment>IK Analyzer 擴展配置</comment>
    	<!--用戶可以在這里配置自己的擴展字典 -->
    	<entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry>
    	 <!--用戶可以在這里配置自己的擴展停止詞字典-->
    	<entry key="ext_stopwords">custom/ext_stopword.dic</entry>
     	<!--用戶可以在這里配置遠程擴展字典 -->
    	<entry key="remote_ext_dict">location</entry>
     	<!--用戶可以在這里配置遠程擴展停止詞字典-->
    	<entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry>
    </properties>

首先我們添加索引:

    curl -XPUT http://localhost:9200/my_index

我們可以把通過put請求來添加索引映射:

    PUT my_index 
    {
      "mappings": {
        "doc": { 
          "properties": { 
            "title":    { "type": "text"  }, 
            "name":     { "type": "text"  }, 
            "age":      { "type": "integer" },  
            "created":  {
              "type":   "date", 
              "format": "strict_date_optional_time||epoch_millis"
            }
           "content": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word"
                }
          }
        }
      }
    }

其中doc是映射名 my_index是索引名稱

3.5.2 logstash與ElasticSearch

logstash默認情況下會在ES中建立logstash-*的索引,*代表了yyyy-MM-dd的時間格式,根據上述logstash配置filter的示例,其會在ES中建立module ,logmessage,class,level等索引。(具體我們可以根據grok插件進行配置)

3.6 啟動Kibana

在kibana的bin目錄下運行./kibana即可啟動。啟動之后我們可以通過瀏覽器訪問http://localhost:5601 來訪問kibanaUI。我們可以看到如下界面:
768E3949_E435_4822_A7FF_4B07BBD6D7DB


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM