二十分鍾快速上手Kafka開發(Java示例)


引子

Kafka 是一個高性能、高可用、高可靠的支持事件數據流的消息隊列系統,是實時計算的可靠數據源。

本文給出使用 Kafka 的基本示例。關於 Kafka 的更多原理性的信息,可閱讀《Kafka權威指南》一書。

基本知識

基本概念

  • 事件(Event):包含 [key, value, timestamp, headers] ,是寫入 Kafka 或從 Kafka 讀取的數據對象。通常是由其它數據源或設備源觸發而來;事件可以看做是消息或記錄;
  • 主題(Topic):將同一類事件對象組織在一起的名字空間。生產者寫入指定的 Topic ,消費者訂閱並從 Topic 中讀取數據;
  • 分區(Partition):分區是事件在主題中的存儲單元。同一個主題下的所有事件會存放在多個分區里;分區可以保證 Kafka 的高可用和可伸縮性。Kafka 保證數據寫入分區和從分區讀取的順序是一致的;
  • 偏移量(Offset):消費者讀取消息時會向 _consumer_offset 的特殊主題提交消息偏移量,便於追蹤消息讀取進度;如果發生分區再均衡(消費者群組中的消費者上線或下線),可以控制消息讀取的不丟失和一致性;

Kafka 遵循生產者-消費者模型(P-C):

  • 生產者(Producer): 往 Kafka 分區寫數據的數據生產者;
  • 消費者(Consumer): 從 Kafka 分區讀取數據的數據消費者;消費者通常會處於某個消費者群組里。

消息系統的基本原理見: “【整理】互聯網服務端技術體系:服務解耦之消息系統”

流程圖

Kafka 生產者寫入示意圖:

Kafka 消費者讀取示意圖:


准備工作

Zookeeper 和 Kafka

“Zookeeper Download” 下載 zookeeper 壓縮包,從 “Kafka Download” 下載 Kafka 壓縮包,使用 tar xzvf xxx.tar.gz 解壓即可。

啟動服務

啟動 Zookeeper 服務。切換到 Zookeeper 解壓目錄下,執行如下命令:

bin/zkServer.sh start-foreground

啟動 Kafka 服務。切換到 Kafka 解壓目錄下,執行如下命令:

bin/kafka-server-start.sh config/server.properties

創建和查看消息主題

執行如下命令,創建了一個 order-events 的消息主題:

bin/kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092

查看主題 order-events 的信息:

bin/kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092

Java示例

步驟一:引入 POM 依賴

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

步驟二:創建Kafka消息發送組件

package cc.lovesq.kafkamsg;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @Description kafka消息發送
 * @Date 2021/2/4 10:47 上午
 * @Created by qinshu
 */
@Component
public class KafkaMessageProducer {

    private static Log log = LogFactory.getLog(KafkaMessageProducer.class);

    private KafkaProducer producer;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092");    // 指定 Broker
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  // 將 key 的 Java 對象轉成字節數組
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 將 value 的 Java 對象轉成字節數組
        properties.put("acks", "1");       // 消息至少成功發給一個副本后才返回成功
        properties.put("retries", "5");    // 消息重試 5 次

        producer = new KafkaProducer<String,String>(properties);

    }

    /**
     * 同步發送消息
     */
    public void send(ProducerRecord record) {
        try {
            producer.send(record).get(200, TimeUnit.MILLISECONDS);
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }

    }

    /**
     * 異步發送消息
     */
    public void sendAsync(ProducerRecord record, Callback callback) {
        try {
            producer.send(record, callback);
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }

    }
}

步驟三: 創建Kafka消息消費組件

package cc.lovesq.kafkamsg;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @Description kafka消息接收
 * @Date 2021/2/4 11:04 上午
 * @Created by qinshu
 */
@Component
public class KafkaMessageConsumer {

    private static Log log = LogFactory.getLog(KafkaMessageConsumer.class);

    private KafkaConsumer consumer;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092");  // 指定 Broker
        properties.put("group.id", "experiment");              // 指定消費組群 ID
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 將 key 的字節數組轉成 Java 對象
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 將 value 的字節數組轉成 Java 對象

        consumer = new KafkaConsumer(properties);
        consumer.subscribe(Collections.singleton("order-events"));  // 訂閱主題 order-events

        new Thread(this::consumer).start();
    }

    public void consumer() {
        try {
            while (true) {
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String,String> record: records) {
                    String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    log.info("Received:" + info);
                }
            }
        } finally {
            consumer.close();
        }

    }
}

步驟四:創建消息發送者(業務),這里借用了“后端簡易實驗框架” 的功能。這里的消息對象可以替換成自己工程里的對象哈,稍加改動即可。

package cc.lovesq.experiments;

import cc.lovesq.constants.DeliveryType;
import cc.lovesq.controller.GoodsSnapshotController;
import cc.lovesq.kafkamsg.KafkaMessageProducer;
import cc.lovesq.model.BookInfo;
import cc.lovesq.model.GoodsInfo;
import cc.lovesq.model.Order;
import cc.lovesq.model.transfer.BookInfoToMessageTransfer;
import cc.lovesq.result.BaseResult;
import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Description 下單實驗
 * @Date 2021/1/4 10:50 上午
 * @Created by qinshu
 */
@Component
public class BookExperiment implements IExperiment {

    private static Log log = LogFactory.getLog(BookExperiment.class);

    @Resource
    private GoodsSnapshotController goodsSnapshotController;

    @Resource
    private KafkaMessageProducer producer;

    private ExecutorService es = Executors.newFixedThreadPool(10);

    @Override
    public void test() {
        generateOrders();
    }

    // 模擬並發下單
    public void generateOrders() {
        for (int i=1; i < 1000; i++) {
            es.submit(() -> {
                book();
            });
        }
    }

    Random random = new Random(System.currentTimeMillis());

    private BaseResult book() {
        BookInfo bookInfo = new BookInfo();
        Order order = new Order();

        Long shopId = 654321L + random.nextInt(10000);
        Long userId = 1234L + random.nextInt(1000);
        Long goodsId = 5678L + random.nextInt(4000);
        order.setShopId(shopId);
        order.setUserId(userId);
        order.setDeliveryType(DeliveryType.express);
        order.setIsCodPay(false);
        bookInfo.setOrder(order);

        GoodsInfo goods = new GoodsInfo();
        goods.setGoodsId(goodsId);
        goods.setShopId(shopId);
        goods.setTitle("認養一頭牛");
        goods.setDesc("2箱*250g");
        bookInfo.setGoods(goods);

        BaseResult bookResult = goodsSnapshotController.save(bookInfo);
        log.info("下單結果:" + JSON.toJSONString(bookResult));

        // 下單成功后發送消息
        producer.sendAsync(
                BookInfoToMessageTransfer.transfer(bookInfo),
                (metadata, exception) -> callback(bookInfo, metadata, exception));

        return bookResult;
    }

    // 消息發送后的回調函數
    private void callback(BookInfo bookInfo, RecordMetadata metadata, Exception ex) {
        if (metadata != null) {
            log.info("發送訂單消息:" + bookInfo.getOrder().getOrderNo() + " 偏移量: " + metadata.offset() + " 主題: " + metadata.topic());
        } else {
            log.error("發送訂單消息失敗: " + ex.getMessage(), ex);
        }
    }
}

至此,就可以實現 Kafka 的消息發送和消息消費示例了。

Kafka 還可以用於可靠的數據源,為實時計算組件提供事件流,如下圖所示代碼:

package cc.lovesq.kafkamsg;

import cc.lovesq.model.BookInfo;
import cc.lovesq.util.TimeUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

/**
 * @Description Kafka 事件流
 * @Date 2021/2/4 8:17 下午
 * @Created by qinshu
 */
@Component
public class KafkaMessageStream {

    private static Log log = LogFactory.getLog(KafkaMessageStream.class);

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderCount");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder streamBuilder = new StreamsBuilder();
        KStream<String,String> source = streamBuilder.stream("order-events");

        // 計算下單中每個 goodsId 出現的次數
        KStream result = source.filter(
                (key, value) -> value.startsWith("{") && value.endsWith("}")
        ).mapValues(
                value -> JSONObject.parseObject(value, BookInfo.class)
        ).mapValues(
                bookInfo -> bookInfo.getGoods().getGoodsId().toString()
        ).groupBy((key,value) -> value).count(Materialized.as("goods-order-count")
        ).mapValues(value -> Long.toString(value)).toStream();

        result.print(Printed.toSysOut());

        new Thread(
                () -> {
                    TimeUtil.sleepInSecs(10);
                    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), properties);
                    streams.start();
                    log.info("stream-start ...");
                    TimeUtil.sleepInSecs(10);
                    streams.close();
                }
        ).start();
    }
}

這里還必須事先創建一個 Topic = goods-order-count 的主題:

bin/kafka-topics.sh --create --topic goods-order-count --bootstrap-server localhost:9092

小結

Kafka 是一個很有潛力的用於業務系統和大數據系統的消息系統。本文給出了使用 Kafka 進行消息發送、消息消費以及事件流處理的基本示例,方便 Kafka 初學者(包括我自己)更好滴上手,進一步去探索 Kafka.

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM