引子
Kafka 是一個高性能、高可用、高可靠的支持事件數據流的消息隊列系統,是實時計算的可靠數據源。
本文給出使用 Kafka 的基本示例。關於 Kafka 的更多原理性的信息,可閱讀《Kafka權威指南》一書。
基本知識
基本概念
- 事件(Event):包含 [key, value, timestamp, headers] ,是寫入 Kafka 或從 Kafka 讀取的數據對象。通常是由其它數據源或設備源觸發而來;事件可以看做是消息或記錄;
- 主題(Topic):將同一類事件對象組織在一起的名字空間。生產者寫入指定的 Topic ,消費者訂閱並從 Topic 中讀取數據;
- 分區(Partition):分區是事件在主題中的存儲單元。同一個主題下的所有事件會存放在多個分區里;分區可以保證 Kafka 的高可用和可伸縮性。Kafka 保證數據寫入分區和從分區讀取的順序是一致的;
- 偏移量(Offset):消費者讀取消息時會向 _consumer_offset 的特殊主題提交消息偏移量,便於追蹤消息讀取進度;如果發生分區再均衡(消費者群組中的消費者上線或下線),可以控制消息讀取的不丟失和一致性;
Kafka 遵循生產者-消費者模型(P-C):
- 生產者(Producer): 往 Kafka 分區寫數據的數據生產者;
- 消費者(Consumer): 從 Kafka 分區讀取數據的數據消費者;消費者通常會處於某個消費者群組里。
消息系統的基本原理見: “【整理】互聯網服務端技術體系:服務解耦之消息系統”
流程圖
Kafka 生產者寫入示意圖:
Kafka 消費者讀取示意圖:
准備工作
Zookeeper 和 Kafka
從 “Zookeeper Download” 下載 zookeeper 壓縮包,從 “Kafka Download” 下載 Kafka 壓縮包,使用 tar xzvf xxx.tar.gz 解壓即可。
啟動服務
啟動 Zookeeper 服務。切換到 Zookeeper 解壓目錄下,執行如下命令:
bin/zkServer.sh start-foreground
啟動 Kafka 服務。切換到 Kafka 解壓目錄下,執行如下命令:
bin/kafka-server-start.sh config/server.properties
創建和查看消息主題
執行如下命令,創建了一個 order-events 的消息主題:
bin/kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092
查看主題 order-events 的信息:
bin/kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092
Java示例
步驟一:引入 POM 依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
步驟二:創建Kafka消息發送組件
package cc.lovesq.kafkamsg;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* @Description kafka消息發送
* @Date 2021/2/4 10:47 上午
* @Created by qinshu
*/
@Component
public class KafkaMessageProducer {
private static Log log = LogFactory.getLog(KafkaMessageProducer.class);
private KafkaProducer producer;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092"); // 指定 Broker
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 將 key 的 Java 對象轉成字節數組
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 將 value 的 Java 對象轉成字節數組
properties.put("acks", "1"); // 消息至少成功發給一個副本后才返回成功
properties.put("retries", "5"); // 消息重試 5 次
producer = new KafkaProducer<String,String>(properties);
}
/**
* 同步發送消息
*/
public void send(ProducerRecord record) {
try {
producer.send(record).get(200, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
/**
* 異步發送消息
*/
public void sendAsync(ProducerRecord record, Callback callback) {
try {
producer.send(record, callback);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
步驟三: 創建Kafka消息消費組件
package cc.lovesq.kafkamsg;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @Description kafka消息接收
* @Date 2021/2/4 11:04 上午
* @Created by qinshu
*/
@Component
public class KafkaMessageConsumer {
private static Log log = LogFactory.getLog(KafkaMessageConsumer.class);
private KafkaConsumer consumer;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092"); // 指定 Broker
properties.put("group.id", "experiment"); // 指定消費組群 ID
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 將 key 的字節數組轉成 Java 對象
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 將 value 的字節數組轉成 Java 對象
consumer = new KafkaConsumer(properties);
consumer.subscribe(Collections.singleton("order-events")); // 訂閱主題 order-events
new Thread(this::consumer).start();
}
public void consumer() {
try {
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record: records) {
String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
log.info("Received:" + info);
}
}
} finally {
consumer.close();
}
}
}
步驟四:創建消息發送者(業務),這里借用了“后端簡易實驗框架” 的功能。這里的消息對象可以替換成自己工程里的對象哈,稍加改動即可。
package cc.lovesq.experiments;
import cc.lovesq.constants.DeliveryType;
import cc.lovesq.controller.GoodsSnapshotController;
import cc.lovesq.kafkamsg.KafkaMessageProducer;
import cc.lovesq.model.BookInfo;
import cc.lovesq.model.GoodsInfo;
import cc.lovesq.model.Order;
import cc.lovesq.model.transfer.BookInfoToMessageTransfer;
import cc.lovesq.result.BaseResult;
import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description 下單實驗
* @Date 2021/1/4 10:50 上午
* @Created by qinshu
*/
@Component
public class BookExperiment implements IExperiment {
private static Log log = LogFactory.getLog(BookExperiment.class);
@Resource
private GoodsSnapshotController goodsSnapshotController;
@Resource
private KafkaMessageProducer producer;
private ExecutorService es = Executors.newFixedThreadPool(10);
@Override
public void test() {
generateOrders();
}
// 模擬並發下單
public void generateOrders() {
for (int i=1; i < 1000; i++) {
es.submit(() -> {
book();
});
}
}
Random random = new Random(System.currentTimeMillis());
private BaseResult book() {
BookInfo bookInfo = new BookInfo();
Order order = new Order();
Long shopId = 654321L + random.nextInt(10000);
Long userId = 1234L + random.nextInt(1000);
Long goodsId = 5678L + random.nextInt(4000);
order.setShopId(shopId);
order.setUserId(userId);
order.setDeliveryType(DeliveryType.express);
order.setIsCodPay(false);
bookInfo.setOrder(order);
GoodsInfo goods = new GoodsInfo();
goods.setGoodsId(goodsId);
goods.setShopId(shopId);
goods.setTitle("認養一頭牛");
goods.setDesc("2箱*250g");
bookInfo.setGoods(goods);
BaseResult bookResult = goodsSnapshotController.save(bookInfo);
log.info("下單結果:" + JSON.toJSONString(bookResult));
// 下單成功后發送消息
producer.sendAsync(
BookInfoToMessageTransfer.transfer(bookInfo),
(metadata, exception) -> callback(bookInfo, metadata, exception));
return bookResult;
}
// 消息發送后的回調函數
private void callback(BookInfo bookInfo, RecordMetadata metadata, Exception ex) {
if (metadata != null) {
log.info("發送訂單消息:" + bookInfo.getOrder().getOrderNo() + " 偏移量: " + metadata.offset() + " 主題: " + metadata.topic());
} else {
log.error("發送訂單消息失敗: " + ex.getMessage(), ex);
}
}
}
至此,就可以實現 Kafka 的消息發送和消息消費示例了。
Kafka 還可以用於可靠的數據源,為實時計算組件提供事件流,如下圖所示代碼:
package cc.lovesq.kafkamsg;
import cc.lovesq.model.BookInfo;
import cc.lovesq.util.TimeUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
/**
* @Description Kafka 事件流
* @Date 2021/2/4 8:17 下午
* @Created by qinshu
*/
@Component
public class KafkaMessageStream {
private static Log log = LogFactory.getLog(KafkaMessageStream.class);
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderCount");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder streamBuilder = new StreamsBuilder();
KStream<String,String> source = streamBuilder.stream("order-events");
// 計算下單中每個 goodsId 出現的次數
KStream result = source.filter(
(key, value) -> value.startsWith("{") && value.endsWith("}")
).mapValues(
value -> JSONObject.parseObject(value, BookInfo.class)
).mapValues(
bookInfo -> bookInfo.getGoods().getGoodsId().toString()
).groupBy((key,value) -> value).count(Materialized.as("goods-order-count")
).mapValues(value -> Long.toString(value)).toStream();
result.print(Printed.toSysOut());
new Thread(
() -> {
TimeUtil.sleepInSecs(10);
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), properties);
streams.start();
log.info("stream-start ...");
TimeUtil.sleepInSecs(10);
streams.close();
}
).start();
}
}
這里還必須事先創建一個 Topic = goods-order-count 的主題:
bin/kafka-topics.sh --create --topic goods-order-count --bootstrap-server localhost:9092
小結
Kafka 是一個很有潛力的用於業務系統和大數據系統的消息系統。本文給出了使用 Kafka 進行消息發送、消息消費以及事件流處理的基本示例,方便 Kafka 初學者(包括我自己)更好滴上手,進一步去探索 Kafka.