使用Log4j將程序日志實時寫入Kafka


 第一部分 搭建Kafka環境

安裝Kafka

下載:http://kafka.apache.org/downloads.html

tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>

啟動Zookeeper

啟動Zookeeper前需要配置一下config/zookeeper.properties:

接下來啟動Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka Server

啟動Kafka Server前需要配置一下config/server.properties。主要配置以下幾項,內容就不說了,注釋里都很詳細:

然后啟動Kafka Server:

bin/kafka-server-start.sh config/server.properties

創建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看創建的Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

啟動控制台Producer,向Kafka發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

啟動控制台Consumer,消費剛剛發送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有當delete.topic.enable=true時,該操作才有效

配置Kafka集群(單台機器上)

首先拷貝server.properties文件為多份(這里演示4個節點的Kafka集群,因此還需要拷貝3份配置文件):

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties

修改server1.properties的以下內容:

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

同理修改server2.properties和server3.properties的這些內容,並保持所有配置文件的zookeeper.connect屬性都指向運行在本機的zookeeper地址localhost:2181。注意,由於這幾個Kafka節點都將運行在同一台機器上,因此需要保證這幾個值不同,這里以累加的方式處理。例如在server2.properties上:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

把server3.properties也配置好以后,依次啟動這些節點:

bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。

現在在Kafka集群上創建備份因子為3,分區數為4的Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

說明:備份因子replication-factor越大,則說明集群容錯性越強,就是當集群down掉后,數據恢復的可能性越大。所有的分區數里的內容共同組成了一份數據,分區數partions越大,則該topic的消息就越分散,集群中的消息分布就越均勻。

然后使用kafka-topics.sh的--describe參數查看一下Topic為kafka的詳情:

輸出的第一行是所有分區的概要,接下來的每一行是一個分區的描述。可以看到Topic為kafka的消息,PartionCount=4,ReplicationFactor=3正是我們創建時指定的分區數和備份因子。

另外:Leader是指負責這個分區所有讀寫的節點;Replicas是指這個分區所在的所有節點(不論它是否活着);ISR是Replicas的子集,代表存有這個分區信息而且當前活着的節點。

拿partition:0這個分區來說,該分區的Leader是server0,分布在id為0,1,2這三個節點上,而且這三個節點都活着。

再來看下Kafka集群的日志:

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此類推。

從上面的配置可知,id為0,1,2,3的節點分別對應server0, server1, server2, server3。而上例中的partition:0分布在id為0, 1, 2這三個節點上,因此可以在server0, server1, server2這三個節點上看到有kafka-0這個文件夾。這個kafka-0就代表Topic為kafka的partion0。

第二部分 Kafka+Log4j項目整合

先來看下Maven項目結構圖:

pom.xml引入的jar包:

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>18.0</version>
    </dependency>
</dependencies>

重要的內容是log4j.properties:

log4j.rootLogger=debug,Console

# appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=kafkaTest
log4j.appender.kafka.syncSend=false
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=192.168.1.163:9092
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

#輸出日志到控制台
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Threshold=all
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n

#kafka
log4j.logger.com.demo.kafka.Log4jToKafka=info,kafka
#關閉spring低級別日志
log4j.logger.org.springside.examples.miniweb=ERROR
log4j.logger.com.octo.captcha.service.image.DefaultManageableImageCaptchaService=ERROR
log4j.logger.com.mchange.v2.resourcepool.BasicResourcePool=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.NewPooledConnection=ERROR
log4j.logger.com.mchange.v2.c3p0.management.DynamicPooledDataSourceManagerMBean=ERROR
log4j.logger.com.mchange.v2.c3p0.C3P0Registry=ERROR
log4j.logger.com.mchange.v2.log.MLog=ERROR
log4j.logger.com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource=ERROR

log4j輸出日志:

package com.demo.kafka;
import org.apache.log4j.Logger;

/**
 * INFO: info User: xuchao Date: 2017/3/17 Version: 1.0 History:
 * <p>
 * 如果有修改過程,請記錄
 * </P>
 */

public class Log4jToKafka {
    private static Logger logger = Logger.getLogger(Log4jToKafka.class);

    public static void main(String args[]) {
        System.out.println("hello word!");
        int start = 1;
        while (true) {
            start++;
            logger.info(start + "hello Log4jToKafka test !");
            try {
                Thread.sleep(50l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

消費kafka中的信息:

package com.demo.kafka;

/**
 * INFO: info
 * User: zhaokai
 * Date: 2017/3/17
 * Version: 1.0
 * History: <p>如果有修改過程,請記錄</P>
 */

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {

    public static void main(String[] args) {
        System.out.println("begin consumer");
        connectionKafka();
        System.out.println("finish consumer");
    }

    @SuppressWarnings("resource")
    public static void connectionKafka() {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.163:9092");
        props.put("group.id", "testConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("kafkaTest"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("===================offset = %d, key = %s, value = %s", record.offset(), record.key(),
                        record.value());
            }
        }
    }
}

MyProducer.java用於向Kafka發送消息,但不通過log4j的appender發送。此案例中可以不要。但是我還是放在這里:

package com.demo.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {
    private static final String TOPIC = "kafka";
    private static final String CONTENT = "This is a single message";
    private static final String BROKER_LIST = "localhost:9092";
    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("serializer.class", SERIALIZER_CLASS);
        props.put("metadata.broker.list", BROKER_LIST);

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        // Send one message.
        KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);

        // Send multiple messages.
        List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 5; i++) {
            messages.add(new KeyedMessage<String, String>(TOPIC, "Multiple message at a time. " + i));
        }
        producer.send(messages);
    }
}

到這里,代碼就結束了。

第三部分 運行與驗證

先運行Consumer,使其處於監聽狀態。同時,還可以啟動Kafka自帶的ConsoleConsumer來驗證是否跟Consumer的結果一致。最后運行Log4jToKafka.java。

先來看看Consumer的輸出:

再來看看ConsoleConsumer的輸出:

可以看到,盡管發往Kafka的消息去往了不同的地方,但是內容是一樣的,而且一條也不少。最后再來看看Kafka的日志。

我們知道,Topic為kafka的消息有4個partion,從之前的截圖可知這4個partion均勻分布在4個kafka節點上,於是我對每一個partion隨機選取一個節點查看了日志內容。

上圖中黃色選中部分依次代表在server0上查看partion0,在server1上查看partion1,以此類推。

而紅色部分是日志內容,由於在創建Topic時准備將20條日志分成4個區存儲,可以很清楚的看到,這20條日志確實是很均勻的存儲在了幾個partion上。

摘一點Infoq上的話:每個日志文件都是一個log entrie序列,每個log entrie包含一個4字節整型數值(值為N+5),1個字節的"magic value",4個字節的CRC校驗碼,其后跟N個字節的消息體。每條消息都有一個當前Partition下唯一的64字節的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:

message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte 
crc : 4 bytes 
payload : n bytes

這里我們看到的日志文件的每一行,就是一個log entrie,每一行前面無法顯示的字符(藍色選中部分),就是(message length + magic value + crc)了。而log entrie的后部分,則是消息體的內容了。

本文轉自:https://my.oschina.net/itblog/blog/540918


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM