logback KafkaAppender 寫入Kafka隊列,集中日志輸出.


為了減少應用服務器對磁盤的讀寫,以及可以集中日志在一台機器上,方便使用ELK收集日志信息,所以考慮做一個jar包,讓應用集中輸出日志

 

Redis 自定義 RedisAppender 插件, 實現日志緩沖隊列,集中日志輸出.

 

 

 網上搜了一圈,只發現有人寫了個程序在github

地址:https://github.com/johnmpage/logback-kafka

Redis 自定義 RedisAppender 插件, 實現日志緩沖隊列,集中日志輸出.

本來打算引用一下這個jar就完事了,沒想到在pom里下不下來,只好把源碼下了,拷貝了代碼過來,自己修改一下.

首先,安裝一個Kafka,作為一個懶得出神入化得程序員,我選擇的安裝方式是

啟動zookeeper容器

docker run -d --name zookeeper --net=host  -t wurstmeister/zookeeper

啟動kafka容器

docker run --name kafka -d -e HOST_IP=192.168.1.7 --net=host -v /usr/local/docker/kafka/conf/server.properties:/opt/kafka_2.12-1.0.0/config/server.properties  -v /etc/localtime:/etc/localtime:ro -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -t wurstmeister/kafka

要修改Kafka的server.properties 中zookeeper配置

配置文件如下

listeners=PLAINTEXT://192.168.1.7:9092
delete.topic.enable=true
advertised.listeners=PLAINTEXT://192.168.1.7:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-92cfb0bbd88c
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.7:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
version=1.0.0

啟動好了,新建SpringBoot項目,首先消費隊列的

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lzw</groupId>
    <artifactId>kafkalog</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafkalog</name>
    <description>Demo project for Spring Boot</description>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

 程序結構

 

KafkaConfig

package com.lzw.kafkalog.config;
/**
 * Created by laizhenwei on 2017/11/28
 */
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String consumerBootstrapServers;


    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String producerBootstrapServers;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
@Bean
public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public Areceiver areceiver() { return new Areceiver(); } @Bean public Breceiver breceiver(){ return new Breceiver(); } }
KafkaAdminConfig
package com.lzw.kafkalog.config;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by laizhenwei on 2017/11/28
 */
@Configuration
public class KafkaAdminConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String producerBootstrapServers;

    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,producerBootstrapServers);
        return new KafkaAdmin(configs);
    }

    /**
     * 創建隊列A,1個分區
     * @return
     */
    @Bean
    public NewTopic a() {
        return new NewTopic("A", 1, (short) 1);
    }

    /**
     * 創建隊列B,1個分區
     * @return
     */
    @Bean
    public NewTopic b() {
        return new NewTopic("B", 1, (short) 1);
    }
}

B隊列消費者

package com.lzw.kafkalog.b;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * Created by laizhenwei on 2017/11/28
 */
public class Breceiver {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @KafkaListener(topics={"B"})
    public void listen(ConsumerRecord data) {
        logger.info(data.value().toString());
    }
}

application.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: 192.168.1.7:9092
    producer: 
      bootstrap-servers: 192.168.1.7:9092

logback-test.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="true">
    <contextName>logback</contextName>
    <property name="LOG_HOME" value="F:/log" />
    <appender name="aAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/a/a.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <!--<fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>-->
            <!-- 日志文件保留天數 -->
            <MaxHistory>30</MaxHistory>
            <!-- 文件大小觸發重寫新文件 -->
            <MaxFileSize>100MB</MaxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="bAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/b/b.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <!--<fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>-->
            <!-- 日志文件保留天數 -->
            <MaxHistory>30</MaxHistory>
            <!-- 文件大小觸發重寫新文件 -->
            <MaxFileSize>100MB</MaxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>

        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!--異步輸出-->
    <appender name="aAsyncFile" class="ch.qos.logback.classic.AsyncAppender">
        <discardingThreshold>0</discardingThreshold>
        <queueSize>2048</queueSize>
        <appender-ref ref="aAppender" />
    </appender>

    <logger name="com.lzw.kafkalog.a" level="INFO" additivity="false">
        <appender-ref ref="aAsyncFile" />
    </logger>


    <!--異步輸出-->
    <appender name="bAsyncFile" class="ch.qos.logback.classic.AsyncAppender">
        <discardingThreshold>0</discardingThreshold>
        <queueSize>2048</queueSize>
        <appender-ref ref="bAppender" />
    </appender>
    <logger name="com.lzw.kafkalog.b" level="INFO" additivity="false">
        <appender-ref ref="bAsyncFile" />
    </logger>

</configuration>

消費者程序,重點是紅框部分

紅框源碼,本來想做個容錯,后來發現不行,原因等下再說

package com.lzw.project_b.kafka;

import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.status.ErrorStatus;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.StringReader;
import java.util.Properties;

public class KafkaAppender<E> extends AppenderBase<E> {

    protected Layout<E> layout;
    private static final Logger LOGGER = LoggerFactory.getLogger("local");
    private boolean logToLocal = false;
    private String kafkaProducerProperties;
    private String topic;
    private KafkaProducer producer;

    public void start() {
        super.start();
        int errors = 0;
        if (this.layout == null) {
            this.addStatus(new ErrorStatus("No layout set for the appender named \"" + this.name + "\".", this));
            ++errors;
        }

        if (errors == 0) {
            super.start();
        }

        LOGGER.info("Starting KafkaAppender...");
        final Properties properties = new Properties();
        try {
            properties.load(new StringReader(kafkaProducerProperties));
            producer = new KafkaProducer<>(properties);
        } catch (Exception exception) {
            System.out.println("KafkaAppender: Exception initializing Producer. " + exception + " : " + exception.getMessage());
        }
        System.out.println("KafkaAppender: Producer initialized: " + producer);
        if (topic == null) {
            System.out.println("KafkaAppender requires a topic. Add this to the appender configuration.");
        } else {
            System.out.println("KafkaAppender will publish messages to the '" + topic + "' topic.");
        }
        LOGGER.info("kafkaProducerProperties = {}", kafkaProducerProperties);
        LOGGER.info("Kafka Producer Properties = {}", properties);
        if (logToLocal) {
            LOGGER.info("KafkaAppender: kafkaProducerProperties = '" + kafkaProducerProperties + "'.");
            LOGGER.info("KafkaAppender: properties = '" + properties + "'.");
        }
    }

    @Override
    public void stop() {
        super.stop();
        LOGGER.info("Stopping KafkaAppender...");
        producer.close();
    }

    @Override
    protected void append(E event) {
        /**
         * 源碼這里是用Formatter類轉為JSON
         */
        String msg = layout.doLayout(event);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
        producer.send(producerRecord);
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean getLogToLocal() {
        return logToLocal;
    }

    public void setLogToLocal(String logToLocal) {
        if (Boolean.valueOf(logToLocal)) {
            this.logToLocal = true;
        }
    }

    public void setLayout(Layout<E> layout) {
        this.layout = layout;
    }

    public String getKafkaProducerProperties() {
        return kafkaProducerProperties;
    }

    public void setKafkaProducerProperties(String kafkaProducerProperties) {
        this.kafkaProducerProperties = kafkaProducerProperties;
    }
}

 

LogService就記錄一段長的垃圾日志
package com.lzw.project_b.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * Created by laizhenwei on 2017/12/1
 */
@Component
public class LogService {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String msg = "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
            "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdf" +
            "sadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
            "sdfsadfasdfsadfasdfsaasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" +
            "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" +
            "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" +
            "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" +
            "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" +
            "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" +
            "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" +
            "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa";

    public void dolog() {
        logger.info(msg, new RuntimeException(msg));
    }

}
KafkaLogController就一個很無聊的輸出日志請求,並記錄入隊時間
package com.lzw.project_b.controller;

import com.lzw.project_b.service.LogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by laizhenwei on 2017/11/29
 */
@RestController
@RequestMapping(path = "/kafka")
public class KafkaLogController {

    @Autowired
    private LogService logService;

    @GetMapping(path = "/aa")
    public void aa() {
        long begin = System.nanoTime();
        for (int i = 0; i < 100000; i++) {
            logService.dolog();
        }
        long end = System.nanoTime();

        System.out.println((end - begin) / 1000000);
    }

}

啟動兩個程序,來一個請求

 查看耗時

生產者的 logback-test.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="true">
    <appender name="KAFKA" class="com.lzw.project_b.kafka.KafkaAppender">
        <topic>B</topic>
        <kafkaProducerProperties>
            bootstrap.servers=192.168.1.7:9092
            retries=0
            value.serializer=org.apache.kafka.common.serialization.StringSerializer
            key.serializer=org.apache.kafka.common.serialization.StringSerializer
            <!--reconnect.backoff.ms=1-->
            producer.type=async
            request.required.acks=0
            <!--acks=0-->
            <!--producer.type=async -->
            <!--request.required.acks=1 -->
            <!--queue.buffering.max.ms=20000 -->
            <!--queue.buffering.max.messages=1000-->
            <!--queue.enqueue.timeout.ms = -1 -->
            <!--batch.num.messages=8-->
            <!--metadata.fetch.timeout.ms=3000-->
            <!--producer.type=sync-->
            <!--request.required.acks=1-->
            <!--reconnect.backoff.ms=3000-->
            <!--retry.backoff.ms=3000-->
        </kafkaProducerProperties>
        <logToLocal>true</logToLocal>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
        </layout>
    </appender>

    時間滾動輸出 level為 monitor 日志
    <appender name="localAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>F:/localLog/b/b.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>F:/localLog/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>
            <!-- 日志文件保留天數 -->
            <MaxHistory>30</MaxHistory>
            <!-- 文件大小觸發重寫新文件 -->
            <MaxFileSize>200MB</MaxFileSize>
            <totalSizeCap>10GB</totalSizeCap>
        </rollingPolicy>

        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <appender name="asyncLocal" class="ch.qos.logback.classic.AsyncAppender">
        <!-- 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日志 -->
        <discardingThreshold>0</discardingThreshold>
        <queueSize>2048</queueSize>
        <appender-ref ref="localAppender"/>
    </appender>

    <!--萬一kafka隊列不通,記錄到本地-->
    <logger name="local" additivity="false">
        <appender-ref ref="asyncLocal"/>
    </logger>

    <!--<appender name="asyncKafka" class="ch.qos.logback.classic.AsyncAppender">-->
        <!--&lt;!&ndash; 不丟失日志.默認的,如果隊列的80%已滿,則會丟棄TRACT、DEBUG、INFO級別的日志 &ndash;&gt;-->
        <!--<discardingThreshold>0</discardingThreshold>-->
        <!--<queueSize>2048</queueSize>-->
        <!--<appender-ref ref="KAFKA"/>-->
    <!--</appender>-->

    <root level="INFO">
        <appender-ref ref="KAFKA"/>
    </root>

</configuration>

關於為什么我沒用有源碼中的Json Formatter ,因為轉換Json會花更多時間,性能更低.源碼中是用了Json-simple,我換成了Gson,快了很多,但是還是有性能影響,如果非要轉成Json

我選擇在ELK中轉,也不會在應用中耗時間去轉

生產者之里,我用了最極端的one way 方式.吞吐量最高,但是無法得知是否已經入隊.

這里生產者的程序里Logback 必須使用同步日志才能客觀知道入隊的耗時.

總結

容錯:我嘗試在生產者中寫一段容錯代碼,一旦鏈接Kafka不通.或者隊列不可寫的時候,記錄倒本地日志.關閉Kafka測試,生產者卻阻塞了,一直重連,程序基本廢了

找了很多方法,沒有找到關閉重連的方式.

靈活性:相比起redis隊列來說,Kafka就比較尷尬(例如我這個場景,還需要保證Kafka隊列可用,性能沒提升多少,還增加了維護成本)

性能:我在固態硬盤與機械硬盤中測試過,由於Kafka很懂機械硬盤,並且對順序寫入做了很大優化,在機械硬盤上表現比固態硬盤性能大概高30%,主打低成本?

        入隊的性能不怎么高,實際上還比不上直接寫入本地(別忘了入隊以后,在消費者那邊還要寫盤,隊列也是持久化倒硬盤,等於寫了兩次盤)

用戶體驗:據說JAVA驅動還算是做得比較好的了

 

最后:不適合我的業務場景.也用得不深.最后我選擇了redis做隊列

        我也沒找到辦法關閉Kafaka的持久化,寫兩次硬盤,某些情況日志並不是不可丟失(redis做隊列很靈活,寫不進隊列的時候,可以寫入本地硬盤),redis進的快消費快,內存基本不會有很大壓力,cpu消耗也不高,個人認為在數據不是特別重要的情況下成本比Kafka還低,性能可是質的提升.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM