Kafka、Logstash、Nginx日志收集入門


Nginx作為網站的第一入口,其日志記錄了除用戶相關的信息之外,還記錄了整個網站系統的性能,對其進行性能排查是優化網站性能的一大關鍵。
Logstash是一個接收,處理,轉發日志的工具。支持系統日志,webserver日志,錯誤日志,應用日志,總之包括所有可以拋出來的日志類型。一般情景下,Logstash用來和ElasticSearch和Kibana搭配使用,簡稱ELK,本站http://www.wenzhihuai.com除了用作ELK,還配合了Kafka進行使用。它使用JRuby編寫,開源,主流,免費,使用簡單。
kafka是一個分布式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。

下面是本站日志系統的搭建

一、Nginx日志

為了配合ELK的使用,把日志變成json的格式,方便ElasticSearch對其檢索。

    log_format main '{"@timestamp":"$time_iso8601",'
      '"host": "$server_addr",'
      '"clientip": "$remote_addr",'
      '"size": $body_bytes_sent,'
      '"responsetime": $request_time,'
      '"upstreamtime": "$upstream_response_time",'
      '"upstreamhost": "$upstream_addr",'
      '"http_host": "$host",'
      '"url": "$uri",'
      '"xff": "$http_x_forwarded_for",'
      '"referer": "$http_referer",'
      '"agent": "$http_user_agent",'
      '"status": "$status"}';

    access_log  logs/access.log  main;

然后執行nginx -t檢驗配置,nginx -s reload重啟nginx即可。
注意:
1.這里的單引號用來標識不換行使用的,如果沒有的話,Logstash會每一行都發送一次。
2.格式一定一定要規范。

二、Logstash

下載安裝的具體請看Logstash官網,這里只講講如何配置

![](http://image.wenzhihuai.com/images/20180114041227.png)

輸入

input {
    file {
        type => "nginx_access"
        path => "/usr/share/nginx/logs/access.log"
        codec => "json"
    }
}

過濾
filter,由於本站沒有涉及到很復雜的手機,所以不填
輸出

output {
    stdout{
        codec => rubydebug
    }
    kafka {
        # 如果是多個["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
        bootstrap_servers => "119.29.188.224:9092"    # 生產者
        topic_id => "nginx-access-log"    #設置寫入kafka的topic
        # compression_type => "snappy"    #消息壓縮模式,默認是none,可選gzip、snappy。
        codec => json       #一定要加上這段,不然傳輸錯誤,${message}
    }
    elasticsearch {
        # 如果是多個["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
        hosts => "119.29.188.224:9200"    #Elasticsearch 地址,多個地址以逗號分隔。
        index => "logstash-%{type}-%{+YYYY.MM.dd}"    #索引命名方式,不支持大寫字母(Logstash除外)
        document_type => "%{type}"    #文檔類型
    }
}

具體字段:
stdout:控制台輸出,方便tail -f查看,可不要
kafka:輸出到kafka,bootstrap_servers指的是kafka的地址和端口,topic_id是每條發布到kafka集群的消息屬於的類別,其中codec一定要設置為json,要不然生產者出錯,導致消費者是看到${message}。
elasticsearch:輸出到elasticsearch,hosts指的是elasticsearch的地址和端口,index指的命名方式
然后啟動Logstash:
nohup bin/logstash -f config/nginxlog2es.conf --path.data=tmp &
tail -f 查看nohup

![](http://image.wenzhihuai.com/images/20180114031909.png)

三、kafka

kafka的原理請看kafka入門,我就不寫了。下面是安裝步驟:
目前的雲服務器都用了NAT轉換公網,如果不開啟外網,kafka會默認使用內網私有地址訪問,所以要開啟外網訪問
只需要在config/server.properties里加入:

advertised.host.name=119.29.188.224

改變默認端口:

advertised.host.port=9200

啟動步驟:
(1)ZooKeeper啟動
bin/zookeeper-server-start.sh config/zookeeper.properties
(2)啟動Kafka
nohup bin/kafka-server-start.sh config/server.properties &
(3)創建一個topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic數量
bin/kafka-topics.sh --list --zookeeper localhost:2181
(4)生產者發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(5)消費者接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
(6)刪除
刪除kafka存儲的日志,在kafka的config/server.properties的log.dirs=/tmp/kafka-logs查看

此處只進行到第二步即可。

四、Spring Boot與Kafka

(1)在父pom.xml中添加:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-releasetrain</artifactId>
            <version>Fowler-SR2</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>1.5.9.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)在消費者模塊中添加:

    <parent>
        <artifactId>micro-service</artifactId>
        <groupId>micro-service</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

配置文件:

# 本地運行端口
server.port=8082
# kafka地址和端口
spring.kafka.bootstrap-servers=119.29.188.224:9092
# 指定默認消費者group id
spring.kafka.consumer.group-id=myGroup
# 指定默認topic id
spring.kafka.template.default-topic=nginx-access-log
# 指定listener 容器中的線程數,用於提高並發量
spring.kafka.listener.concurrency=3
# 偏移量,最好使用latest,earliest會從kafka運行起開始一直發送
spring.kafka.consumer.auto-offset-reset=latest
# 心跳檢測
spring.kafka.consumer.heartbeat-interval=100

(5)接收消息

@Component
public class MsgConsumer {
    @KafkaListener(topics = {"nginx-access-log"})
    public void processMessage(String content) {
        System.out.println(content);
    }
}

(6)測試
運行之后點擊網站http://www.wenzhihuai.com可看到:

![](http://image.wenzhihuai.com/images/20180114032728.png)

五、錯誤記錄

(1)與Spring的包沖突:

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2018-01-05 11:10:47.947 ERROR 251848 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.context.ApplicationContextException: Unable to start embedded container; nested exception is org.springframework.boot.context.embedded.EmbeddedServletContainerException: Unable to start embedded Tomcat
	at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.onRefresh(EmbeddedWebApplicationContext.java:137) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
	at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]

解決辦法:去掉父pom.xml文件里所有關於spring的包,只保留spring boot的即可

(2)消費者只接受到${message}消息(原圖已失效)

![](http://image.wenzhihuai.com/images/20180110084946.png)

解決辦法:
一定要在output的kafka中添加

   codec => json

完整代碼可以到https://github.com/Zephery/micro-service查看
個人網站http://www.wenzhihuai.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM