0 前言
要想理解某個系統是怎么運行的,首先我們可以看看它提供什么樣的API。本文從 Kafka 的協議交互流程入手,分析 Producer 和 Consumer 是如何工作的。一方面,可以用來實現自己的 kafkasdk;另一方面也能更好地理解 Kafka 的內部原理。
接下來就從以下3個方面來學習Kafka協議:
- Kafka協議格式,包括編解碼方案;
- Producer 工作流程;
- Consumer 工作流程
本文基於 Kafka 1.0 版本描述,較新版本(v2.7)肯定有出入,但核心邏輯沒有改變
1 Kafka協議格式
這里主要參考 Kafka 官方提供的 KAFKA PROTOCOL GUIDE。如果你要自己實現 kafka Client,那么建議最好把它打印出來放在手邊,一個字一個字地看 n 遍。
如果你只是想要了解 Producer 或者 Consumer 的工作流程,那么只需要看看我接下來總結的內容即可。
Kafka協議可以分為 Request 和 Response。

從某種程度來說,Kafka更多的是提供了 RPC 功能:請求只能由 Client 主動發到 Broker;Broker 針對每個請求回復一個響應給 Client。
不同的 Request 使用不同的 apikey 來區分;Request 和 Response 通過 CorrleationId 來一一對應。
編解碼方案
從編解碼角度來說,每個協議包都是由 4字節的 size 開頭,后面再跟相應字節的請求包或響應包。
在1.0及以前版本中,Kafka協議中可使用的數據類型僅有3種:
- 固定長度的整形,包括 int8, int16, int32, int64 等;
- 可變長度的字符串,包括 string, bytes 等;
- 復合類型,包括 array 等。
每種類型都有特定的編解碼方案,具體可以參考官方文檔,這里不再詳述。
從2.0版本開始,又增加了很多復雜的類型,比如 boolean, varint, varlong, uuid, float64, compact_string, compact_bytes 等等。
Request & Response
Kafka最新版本中提供的Request已經達到50多種了,但是比較核心的其實也就下面幾種:
| 請求 | 說明 |
|---|---|
| Metadata | 查詢集群當前Broker列表,以及指定的topic信息,包括partition數量以及leader/replicas信息 |
| Produce | 發布消息 |
| Fetch | 從指定偏移量開始拉取某個(些)partition的消息 |
| Offset | 查詢某個(些)partition的offset信息,可以指定時間戳 |
| Offset Commit | 提交offset,只針對ConsumerGroup |
| Offset Fetch | 查詢某個ConsumerGroup當前提交的offset信息 |
此外,從0.9版本開始,Kafka提供了消費組的概念,並相應地提供了一組管理協議,包括 GroupCoordinator/JoinGroup/SyncGroup/LeaveGroup/Heartbeat 等,具體在后面的Consumer流程中再講。
Request
每個請求都有固定的 header,具體格式如下:
struct RequestHeader {
int32_t size; // 請求總長度
int16_t apikey; // 區分請求類型
int16_t version; // 區分請求版本
int32_t correlationId; // 請求上下文,用於對應回包
std::string clientid; // 請求方標識,僅用來打日志
};
在具體發包時,header 在前(這不廢話嗎!),后面再跟具體的請求包。請求包的大小等於總的 size 減去 header 的大小。
版本兼容
注意到頭部的 version 字段,它是用來保證客戶端和服務端版本兼容的。Kafka保證的兼容策略是 bidirectional compatibility。即,新版本客戶端可以訪問舊版本的 broker;新版本 broker 可以接受舊版本的客戶端的請求。
客戶端在連接上 Broker 並實際開始工作之前,可以先發送 ApiVersionsRequest 請求到每個 Broker,以查詢 Broker 支持的 版本列表,並從中選取一個它能識別的最高版本作為后續使用版本。
並且這個版本協商是基於連接的:每次連接斷開並重連時,都要重新進行版本協商。因為斷線可能正是因為Broker升級導致的。
Response
每個回包也有固定的 header,具體格式如下:
struct ResponseHeader
{
int32_t size;
int32_t correlationId;
};
回包的 header 就很簡單了,只有一個 correlationId。所以客戶端必須要把處理回包時要用到的信息全部在發出請求時保存在請求上下文中,然后通過 correlationId 找到上下文。
C++實現
根據官方文檔中的編解碼規范,我們就可以自己寫一個C++版本的編解碼實現了。
kafkaprotocpp的關鍵類有:
Pack&Unpack。負責各種Kafka支持的數據類型的編解碼;Request。負責 Request 的編解碼;Response。負責 Response 的編解碼;Marshallable。所有具體的請求或響應,都需要繼承此抽象基類,實現自己的marshal/unmarshal方法。
以MetadataRequest為例,請求協議定義如下:
struct MetadataRequest : public Marshallable
{
enum { apikey = ApiConstants::METADATA_REQUEST_KEY, apiver = ApiConstants::API_VERSION0};
std::vector<std::string> vecTopic;
virtual void marshal(Pack &pk) const
{
pk << vecTopic;
}
virtual void unmarshal(const Unpack &up)
{
up >> vecTopic;
}
};
struct MetadataResponse : public Marshallable
{
std::vector<Broker> vecBroker;
std::vector<TopicMetadata> vecTopicMeta;
virtual void marshal(Pack &pk) const
{
pk << vecBroker << vecTopicMeta;
}
virtual void unmarshal(const Unpack &up)
{
up >> vecBroker >> vecTopicMeta;
}
};
收發請求如下:
std::string topic = "test";
MetadataRequest metareq;
metareq.vecTopic.push_back(topic);
Request req(1, "meta_test", metareq.apikey, metareq.apiver, metareq);
// 發包到指定套接口
int ret = send_request(sockfd, req.data(), req.size());
// 收包
char* buf = read_response(sockfd);
size_t len = Response::peeklen(buf);
Response resp(buf, len);
resp.head();
MetadataResponse meta;
try {
meta.unmarshall(resp.up);
} catch(PacketError& pe) {
}
// 處理Meta信息
感興趣的可以具體去我的github看它的源碼。
2 Partition存儲模型
在深入了解 Producer 和 Consumer 的交互流程之前,我們先來看下Kafka的存儲模型。
在 Kafka 中,topic 是消息的邏輯單元:不同的 topic 代表了不同的業務數據,是完全相互獨立的。partition 則是消息存儲的物理單元:每個 topic 可以分成若干個 partition,不同的 partition 可以存儲在不同的 Broker 上。
我們先來看下如何在 Kafka 中創建 topic:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test
這里我們指定了3個參數:
--topic表示名稱,必須唯一;--partitions表示分區個數,據消息並發吞吐量和客戶端處理能力設置;--replication-factor表示消息備份數,決定了數據的可靠性
接下來可以看到這個topic的具體分區信息:
$ kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 402 Replicas: 402,401 Isr: 402,401
Topic: test Partition: 1 Leader: 403 Replicas: 403,402 Isr: 403,402
Topic: test Partition: 2 Leader: 401 Replicas: 401,403 Isr: 401,403
當我們在創建topic的時候,kafka會創建指定數量的partition,並將其存儲在若干個(由ReplicationFactor決定)Broker上。
在這些Broker中,會選出一個作為這個 partition 的 leader,來負責它的生產和消費請求。
在Kafka集群中,會有一個Broker被選舉為集群的 Controller (借助Zookeeper),來負責partition的分配工作,重點是保證集群內各Broker的負載均衡。
例如,這里 Partition 0 落在了 402,401 這兩個Broker上,並且其中排在前面的那個就是它的 Leader。當我們要發布或消費這個 partition 的消息時,必須將 Producer & Fetch請求發到這台 Broker。
3 Producer 工作流程
接下來看下 Producer 發布消息到 Kafka 時的流程,看看中間都經歷了些什么:

這里,我們假定集群有2台Broker,每台Broker各自是一個partition的leader。那么Producer的具體流程可以描述為:
- Producer向 BootstrapServer 發送
MetadataRequest,查詢集群當前Broker列表,以及partition的leader信息; - Producer向 Broker1 請求發布消息到 partition-0,收到成功回復;
- Kafka集群發生 leader 轉移,partition-0 的 leader 變成了 Broker2;
- Producer再次向 Broker1 發布消息,收到錯誤碼(UNKNOWN_TOPIC_OR_PARTITION);
- Producer再次向任意一台 Broker 發送
MetadataRequest,查詢最新的 leader 信息; - Producer重新向 Broker2 請求發布消息到 partition-0,收到成功回復。
可以看到,整個發布流程只涉及到兩個協議:MetadataRequest 和 ProduceRequest。
MetadataRequest
官方格式定義:
Metadata Request (Version: 0) => [topics]
topics => name
name => STRING
Metadata Response (Version: 0) => [brokers] [topics]
brokers => node_id host port
node_id => INT32
host => STRING
port => INT32
topics => error_code name [partitions]
error_code => INT16
name => STRING
partitions => error_code partition_index leader_id [replica_nodes] [isr_nodes]
error_code => INT16
partition_index => INT32
leader_id => INT32
replica_nodes => INT32
isr_nodes => INT32
Metadata主要查詢兩種信息:
- 集群的Broker列表,包括 node_id, host, port 等信息;
- 指定的topic信息,包括 partition 數量,以及每個 partition 的 leader, replicas, isrs等。
如果查詢時沒有指定任何 topic,那么會查詢到集群所有的 topic 信息。
Kafka Client 有兩種場景下會發送MetadataRequest:
- 啟動之后定期查詢,以便感知到新的Broker信息和topic信息(例如partition擴容了);
- 當生產或消費時,提示當前Broker已經不是Leader了,需要及時更新信息
正是因為有Metadata協議的存在,Kafka Client在運行過程中總是能動態感知到集群所有的Broker信息。因此,我們在啟動 Producer 或 Consumer 時,配置的bootstrap.server只需要包含一台可用的Broker信息就可以了。
此外,我們通過MetadataResponse獲取到的 Broker 的 host 就是我們在部署 Kafka 時配置的advertised.listeners項。需要注意它和listeners的區別:
listeners配置的是 Kafka 監聽的套接口,例如我們可以配置為PLAINTEXT://0.0.0.0:9092來監聽本機所有網口;advertised.listeners是寫入到ZooKeeper進而被客戶端通過MetadataRequest拿到的地址
舉個例子,假定我們的Kafka部署在雙線或多線機房,為了保證高可用,我們通常是配置為監聽所有網口。但是advertised.listeners又不能配置為0,所以我們可以給它配置成一個域名:這個域名再綁定Broker的所有網口的具體IP。這樣KafkaClient拿到域名后就可以解析到多個IP,並在連接斷開時可以嘗試使用另外的IP來重連。
ProduceRequest
官方協議定義:
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
v2 (supported in 0.10.0 or later)
ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
Timestamp => int64
ThrottleTime => int32
從協議可以看出來,這是一個批量接口:每次請求可以指定發數據給多個 topic 或者多個 partition 。
這里重點看一下 RequireAcks 參數,它決定了生產者往Kafka發消息的可靠性。它表示的意思是:Leader 收到請求后需要等待多少個 replicas 的 ack,才能回包給客戶端。它的取值為:
- 0,表示Leader不會發送回包給客戶端;
- 大於0且小於 ReplicaFactor 的整數,表示將數據同步到指定數量的 Broker 上才給客戶端回包;
- -1,表示Leader會將數據同步到所有 ISR 集合中的Broker后才給客戶端回包。
所以,RequireAcks值越大,表示可靠性越高,但是效率就相應地越低了。不過要更詳細地描述可靠性的話,還得理解一個概念 ISR。
在Kafka內部,Leader 和 Follower 間的數據同步,依靠每個 Follower 通過 long-pull 模式一直不停地從 Leader 拉取數據。那自然地,Leader 會維護每個 Follower 拉取到了哪個 offset,以及與最新offset的差值是多少。當這個差值(lag值)不超過某個既定值時,就認為這個 Follower 是跟 Leader 保持同步的,屬於 In-Sync-Replicas 集合。
顯然,這個 ISR 集合是動態變化的。當某個 Follower 長時間沒有過來拉,或者 lag 值比較大時,就會被踢出 ISR;當它恢復之后,又會被重新加入 ISR 集合。只有被 ISR 集合中所有的 Broker 都同步的消息,才被認為是已提交的(committed)。只有已提交的消息,才能被消費者看到。
如果當前 leader 掛了,因為已提交的消息肯定在 ISR 集合中的其它 Broker 上都存在,所以只要ISR集合不為空,那么重新選一個作為 Leader 即可。但是如果 ISR 為空呢?這取決於另外一個參數 unclean.leader.election.enable:如果設置為 true,那么可以選擇一個不在 ISR 集合中的 Replica 作為 leader。但是這可能導致部分已提交的消息丟了,相當於是拿 可靠性 換 可用性。
此參數可全局設置,也可針對 topic 設置。
再說回 ProduceRequet。假定我們有個 partition 的 ReplicaFactor 為3,表示會存儲在 3 台 Broker(包括Leader)上。如果發布時的 RequireAcks 填了 3,那就表示每次發布都要 3 台都同步到數據才算成功。那如果 ISR 集合中沒有 3 台會怎樣呢?答案就是不可寫。
從某種程度來說,我們在創建 topic 時指定的 ReplicaFactor 就已經表示了我們對這個 topic 的數據可靠性的要求了。那如果在發布時再去設置ack值,感覺有點冗余了。而且有時候我們在發布的時候,不太好知道這個topic的具體ReplicaFactor值。所以,我們可以將 acks 填為 -1,表示等待當前ISR集合中都同步了就算成功。
在一切正常的情況下,ISR 集合就等於 Replicas 集合;當出問題時,有問題的Broker就會被踢出 ISR 集合。考慮到在不出問題的時候,除Leader之外的Replicas是發揮不出作用的,所以如果沒有其它機制保障的話,acks 填 -1,好像可靠性不太“穩定”。
所以 Kafka 提供了另外一個參數min.insync.replicas。當 acks 填 -1 時,如果 ISR 集合數量小於此值的話,拒絕寫入數據。這樣就給可靠性設置了一個底線。
此參數可全局設置,也可針對 topic 設置。
4 Consumer 工作流程
消費組的工作原理,其實可以分解成3個相互獨立的子過程:
- 組關系的維護。包括 JoinGroup, SyncGroup, LeaveGroup 以及維持組狀態的心跳包 HeartBeat;
- offset偏移量的管理。包括 FetchOffset, FetchCgroupOffset, CommitOffset 等;
- 拉取消息。包括 FetchMessage 等。
這3個過程相互獨立,從協議交互角度來說,你可以單獨調用每個過程涉及到的協議來實現特定目的。
維護消費組關系
關於Coordinator
為了保證數據的一致性,每個消費組的狀態都由某個固定的 Broker 來管理。這個 Broker 稱為該消費組的 Coordinator。從負載均衡角度來講,集群內每個Broker都是差不多數量的消費組的 Coordinator。針對特定消費組來說,它的所有的組管理相關的請求都必須發送給 Coordinator 才能被處理。
因此,Consumer 啟動后的第一件事,就是查詢它的 Coordinator。方法很簡單,發送 QueryCoordinator 請求到任意一台 Broker 即可。
關於消費組
在一個消費組里,每個 Consumer 都會被 Coordinator 分配一個唯一的 memberid。並且,Coordinator會挑選一個 Consumer 作為這個消費組的 leader。
所謂消費組,就是多個消費者共同出力來消費某個(些)topic的所有parittion。所有的這些 partition 會被均衡地分配給所有的消費者。也因此,partition 數量一般不會超過消費者的數量。當然,如果只有1個parittion,為了保證高可用,也會起2個消費者,以便當其中1個出問題的時候,另1個能立即接管過來。
那誰來負責在組內分配 partition 呢?你可能會覺得是 Coordinator,但其實不是!真正負責分配的是消費組的 leader Consumer。這也是 Coordinator 的名字的由來:Broker 只是幫忙協調和維護組關系,具體涉及到消費的活(包括分配),都是由 Consumer 自己完成。
關於消費組狀態
維護組關系,其本質就是在客戶端中維護一個消費組的狀態機,如下圖所示:

在描述狀態機轉移過程之前,我們先來看一下一個正常的消費組的狀態:
每個成員都處於 CS_UP 狀態,各自消費自己負責的 partition,並且需要每隔固定間隔(不超過配置值 heartbeat.interval.ms)發送心跳包給 Coordinator 來維持狀態,否則就會被踢出消費組。
好了,再來看 Consumer 的狀態機轉移過程:
- 消費者啟動后默認是
CS_DOWN狀態,然后發送JoinGroup給 Coordinator 來請求加入組,並轉變為CS_JOINING狀態; - 當 Coordinator 察覺到成員有變動時,它會在每個現有成員的下一個心跳包回包中告知它們,需要重新發起
JoinGroup。這樣,每個現有成員就從CS_UP狀態變為CS_JOINING; - 當 Coordinator 收到所有成員的 Join 請求后,就會從中選出新的 leader,並且通過 JoinResponse 告知所有成員誰是 leader。其中,給 leader 的回包中,會帶上所有成員的信息以及它們訂閱的topic列表;
- 當消費者收到 JoinResponse 后,就會知道自己是不是 leader。如果是 leader,就需要分配 partition 了,然后把分配結果放在
SyncGroup中發送給 Coordinator。除了 leader 之外,其它成員也都需要發送SyncGroup,只不過不用帶分配結果。這樣,所有成員都從CS_JOINING變成了CS_SYNCING; - Coordinator 收到所有成員的 SyncGroup 請求后,會將 leader 上傳的分配結果放到
SyncResponse告知給所有成員。這樣所有成員收到回包后,就知道自己該負責消費哪些 partition 了,然后狀態從CS_SYNCING變成CS_UP,就可以准備干活了。 - 此后,每個成員要繼續定期發送
Heartbeat以保持在CS_UP狀態。
以上就是整個消費組的狀態轉移過程了。為了避免一些非法的消費者進程(例如那些卡了很久然后突然又恢復了)干擾消費組狀態,Coordinator 會為每個消費組維護一個單調遞增的 generationId。每次有成員變動時,generationId都會加1。當收到generationId與當前值不一致的請求時,會拒絕。
關於消費組偏移量
當拿到分配結果后,Consumer 就准備開始干活了。
相比於單機版的消費模式,消費組除了能負載均衡之外,還有另外一個好處:Coordinator 可以幫助存儲上次消費到的偏移量,以便當某個partition從一個消費者轉移到另一個消費者時,可以接着消費,從而保證消息不丟失。
所以,Consumer 在具體拉取消息之前,要首先能夠知道該從哪個位置開始拉取。方法也很簡單,發送 OffsetFetchRequest 到 Coordinator 就可以獲取到之前提交的便宜量了。
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string
TopicName => string
Partition => int32
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string
ErrorCode => int16
那如果這個消費組之前沒人提交過對應 partition 的 offset 呢?
那就需要用另外一個協議了——發送 OffsetRequest 到 partition 的 Leader Broker。
// v0
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
MaxNumberOfOffsets => int32
// v1 (supported in 0.10.1.0 and later)
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
這個請求可以查詢到指定 partitions 的偏移量信息。其中,參數 Time 可以指定要查詢哪個時間戳的便宜量,取值可以為:
> 0。表示查詢指定時間戳(單位ms)之前的最后一條消息的offset;-1。表示查詢最近的offset(latest);-2。表示查詢最老的offset(earlist)。
這也是我們在配置 Consumer 時經常碰到的一個參數:auto.offset.reset。只不過比較奇怪的是,明明協議層面可以支持配置一個具體的時間戳,但是所有的Client暴露出來的接口,只能配置成 earliest 或 latest。
當消費組正常消費時,可以隨時把已經消費過的偏移量提交到 Coordinator。方法就是發送 OffsetCommitRequest 到 Coordinator。
提交到 Coordinator 的offset信息也是有個有效期的,當超過規定時候沒有提交時,Broker 也會把 offset 給刪掉的。這樣也會重新觸發上面提到的沒有初始偏移量的邏輯。
這里有個點,就是 Coordinator 不會去校驗你提交的offset是否合法。換句話說,它只是提供了一個 key 為 'groupid/topic/partition',value 為 int64 的讀寫接口。
我們可以利用這個特點,來為Kafka跨集群做數據同步。Kafka跨集群同步,方法一般就是在源機房部署一套消費者,然后將消息發布到目的機房。
那這里就涉及到是采用 同機房消費,跨機房發布 還是 跨機房消費,同機房發布 的選擇了。
- 跨機房消費,意味着消費組的狀態不穩定,頻繁的網絡超時會導致消費組的 rebalance。在消費組 rebalance 時,所有消費都需要暫停。但是跨機房拉取消息本身沒有副作用;
- 跨機房發布,意味着當網絡超時時,發布端需要重發,導致目的機房消息重復。(不過新版本Kafka已經支持發布端邏輯去重了)
不過借助上面提到的特點,我們可以在目的機房部署消費組,但是把消費組的組管理和offset管理放在目的機房Kafka(即在目的機房加入消費組),但是提交的offset其實是源機房partition的offset, 消息還是從源機房拉取。
這樣就綜合了兩種方案的優點,並且規避了各自的缺點。不過這種方案需要定制化Kafka的SDK。
關於拉取消息
有了 partition 列表,有了每個 partition 的初始offset,那么接下來 Consumer 的工作就很簡單了,只要通過 long-pull 模式不停地去各自 partition 的leader 拉取消息即可。
拉取消息通過發送 FetchRequest 給 leader:
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
ThrottleTime => int32
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
MessageSetSize => int32
在拉取時,可以指定 MinBytes 和 MaxBytes,來指定本次拉取最少和最多拉取多少數據,以及最多等待時間 MaxWaitTime。在回包中,Kafka除了返回具體的消息之外,還會返回一個參數HighwaterMarkOffset,表示該 partition 目前可供消費者消費的最新的offset。通過此值我們可以知道還有多少消息待拉取。
這里需要注意的是,HighwaterMarkOffset表示的是消費者能看到的最新offset,不表示發布者最新發布的offset。這個涉及到Kafka內部同步機制,只有被所有 ISR 集合中的Broker同步的消息,才能增加 HighwaterMarkOffset。
5 總結
除了 Producer 和 Consumer 相關協議之外,Kafka還提供一些管理類的API,包括 ListGroup(列出所有消費組)、DescribeGroups(查詢消費組狀態)等等。新版kafka還提供了創建Topic之類的APi。深度利用這些協議可以用來寫一些Kafka的監控管理插件。
弄懂Kafka的協議交互流程,除了可以加深對Kafka的理解之外,還有以下的好處:
- 幫助定位生產者和消費者的問題。由於Kafka Broker不會打印任何與客戶端相關的異常日志信息,全部都是以錯誤碼的形式告知給客戶端。因此了解協議交互流程,就可以更好地從客戶端側的日志了解到具體是哪個環節出問題;
- 實現自己的KafkaSdk。由於作者在工作中需要實現Kafka的跨集群同步數據,而開源的sdk都不太適合,因此只能自己實現了一個C++版sdk;
- 兼容Kafka協議來提供服務。Kafka已經在互聯網行業得到大規模應用,如果新開發的MQ系統可以兼容Kafka協議,那么可以掠奪大量的Kafka使用者,實現無縫遷移。例如最近比較火熱的 pulsar,就是通過 Kop 插件來兼容了Kafka協議。
希望本篇文章可以讓大家更加理解Kafka的工作模式。
