kafka-clients 1.0 高階API消費消息(未完)


消費消息的請求(按序)

  • org/apache/kafka/common/requests/RequestHeader
  • org/apache/kafka/common/requests/ApiVersionsRequest
  • org/apache/kafka/common/requests/MetadataRequest 批量查詢topic的元數據信息
  • org/apache/kafka/common/requests/FindCoordinatorRequest 從拿到的topic的元數據中取出leader節點 作為組協調者
  • org/apache/kafka/common/requests/JoinGroupRequest
  • org/apache/kafka/common/requests/SyncGroupRequest
  • org/apache/kafka/common/requests/OffsetFetchRequest
  • org/apache/kafka/common/requests/ListOffsetRequest
  • org/apache/kafka/common/requests/FetchRequest
  • org/apache/kafka/common/requests/HeartbeatRequest

請求接口文檔參考
響應接口文檔參考

RequestHeader

請求頭

name type defaultValue docString
api_key INT16 null 請求接口編號
api_version INT16 null api版本
correlation_id INT32 null 用戶提供的一個整數id,用於響應時由響應體帶回來
client_id NULLABLE_STRING null 用戶提供的client id

ApiVersionsRequest

查詢API版本信息

請求 version:1

僅僅有請求頭

響應 version:1

name type defaultValue docString
error_code INT16 null 錯誤碼
api_versions ARRAY({api_key:INT16,min_version:INT16,max_version:INT16}) null broker能支持的api各版本列表。含最低版本,最高版本.
throttle_time_ms INT32 0 由於配額沖突而阻止請求的持續時間(毫秒)(如果請求未違反任何配額,則為零

雖是請求broker端,但是實際還是用的client中的API完成的邏輯:
ApiVersionsResponse.apiVersionsResponse
根據messageFormatVersion 消息格式版本推導出各API版本情況。
API版本 最小的是0 。寫的固定的。 最大的是 requestSchemas的length -1 即requestSchemas最大版本。
此處不僅返回每個API的最小版本與最大版本,還返回能支持的API列表。如因版本問題不能支持的API是不會返回的。
能否支持的判斷依據是,API依賴的最小消息格式版本小於當前的消息格式版本,那么就支持。

for (ApiKeys apiKey : ApiKeys.values()) {
    if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
        versionList.add(neApiVersionsResponse.ApiVersion(apiKey));
    }
}

MetadataRequest

批量查詢topic的元數據信息

請求 version:5

name type defaultValue docString
topics ARRAY(STRING) null 需要查元數據的topic的列表,如果不送則查所有topic的元數據
allow_auto_topic_creation BOOLEAN null 在broker配置了允許自動創建topic時是否自動創建topic

響應 version:4

name type defaultValue docString
throttle_time_ms INT32 0 由於配額沖突而阻止請求的持續時間(毫秒)(如果請求未違反任何配額,則為零)
brokers ARRAY({node_id:INT32,host:STRING,port:INT32,rack:NULLABLE_STRING}) null 所有 活着的 broker的id ip port的信息
cluster_id NULLABLE_STRING null 集群id
controller_id INT32 null controller角色的broker的id
topic_metadata ARRAY({error_code:INT16,topic:STRING,is_internal:BOOLEAN,partition_metadata:ARRAY({error_code:INT16,partition:INT32,leader:INT32,replicas:ARRAY(INT32),isr:ARRAY(INT32)})}) null topic元數據,分區數、leader broker的id、副本所在broker id列表、isr broker id列表

broker端處理
在broker端

  1. 過濾出授權的topics KafkaApis.handleTopicMetadataRequest
  2. 查詢出授權topics的元數據 KafkaApis.getTopicMetadata
    2.1 從緩存中拿,拿到(跟topics的size相同)即返回
    2.2 處理沒拿到的topic
    2.2.1 允許創建topic的,就按默認副本數和默認分區數創建,不能創建的或者創建出錯的返回出錯信息。創建topic前提是協調者可用。否則COORDINATOR_NOT_AVAILABLE。
    2.2.2 返回創建后的metadata
    元數據信息有緩存 kafka.server.MetadataCache.cache:topic <--> [partitionNo <--> 分區狀態封裝]
    MetadataCache中一系列getxxx方法都是用來讀取檢索的。
    元數據緩存的更新參見 《MetadataCache更新》

FindCoordinatorRequest

查詢協調者

請求 version:1

name type defaultValue docString
coordinator_key STRING null 組協調時是組id
事務協調時是事務id
coordinator_type INT8 null 協調類型(0 = group, 1 = transaction)

響應 version:1

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
error_code INT16 null Response error code
error_message NULLABLE_STRING null Response error message
coordinator {node_id:INT32,host:STRING,port:INT32} null 協調者broker的id ip port

組協調與事務協調都用這個請求
coordinatorKey 組協調是組id 事務協調時是事務id
分區對應的leader節點就是組協調者

 val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
    .find(_.partition == partition)
    .map(_.leader) // SimonNote: leader節點作為協調者

這個請求的響應也就是將協調者信息(node_id,host,port)返回去

JoinGroupRequest

加入消費組的請求

請求 version:2

name type defaultValue docString
group_id STRING null 唯一的組標志
session_timeout INT32 null 會話時間,超過這個時間沒收到心跳,協調者就認為這個消費者掛了
rebalance_timeout INT32 null 協調者在重新平衡組時等待每個成員重新加入的最長時間
member_id STRING null 由組協調者分配的成員ID,如果是第一次加入,則為空。
protocol_type STRING null 組協調協議實現類的唯一名稱
group_protocols ARRAY({protocol_name:STRING,protocol_metadata:BYTES}) null 組成員能支持的組協調協議列表

響應 version:2

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
error_code INT16 null Response error code
generation_id INT32 null 組的年代?
group_protocol STRING null 協調者選中的組協議
leader_id STRING null 組中的leader
member_id STRING null 第一次加入的時候組協調者給分的成員id
members ARRAY({member_id:STRING,member_metadata:BYTES}) null 組內成員?

kafka.coordinator.group.GroupCoordinator.handleJoinGroup

一系列的check:

  1. 協調者是否可用
  2. 是否是本分區的協調者
  3. 消費組id是否合法(是否為空)
  4. 是否協調者正在load中,GroupMetadataManager會管理當前partition是否在load中
  5. sessionTimeoutMs是否在組配置的最大最小范圍內

groupManager加入新建的GroupMetadata實例(如果沒有的話,有就直接下一步了),GroupMetadata有哪些東西,下面注釋寫了一部分,但是還包含事務消息用一些offset

/**
 * Group contains the following metadata:
 *
 *  Membership metadata:
 *  1. Members registered in this group
 *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
 *  3. Protocol metadata associated with group members
 *
 *  State metadata:
 *  1. group state
 *  2. generation id
 *  3. leader id
 */

doJoinGroup

一系列的check后,根據group.currentState做相應處理
group.currentState

GroupMetadata.scala中有對group狀態定義及action及走向到哪的明確詳細描述,非常重要

SyncGroupRequest

請求 version:1

name type defaultValue docString
group_id STRING null group唯一標志
generation_id INT32 null 代的標志?
member_id STRING null 第一次加入的時候組協調者給分的成員id
group_assignment ARRAY({member_id:STRING,member_assignment:BYTES}) null null

響應 version:1

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
error_code INT16 null Response error code
member_assignment BYTES null null

OffsetFetchRequest

請求 version:3

name type defaultValue docString
group_id STRING null group id
topics ARRAY({topic:STRING,partitions:ARRAY({partition:INT32})}) null topic列表,支持多個topic

響應 version:3

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
responses ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,offset:INT64,metadata:NULLABLE_STRING,error_code:INT16})}) null 列表:topic-[{分區號-offset,元數據信息}]
error_code INT16 null Response error code

ListOffsetRequest

請求 version:2

name type defaultValue docString
replica_id INT32 null follower的broker的id. 正常消費用-1.
isolation_level INT8 null 事務消息可見性設置。 使用 READ_UNCOMMITTED (isolation_level = 0)能看到所有消息. 使用 READ_COMMITTED (isolation_level = 1), 非事務消息和已經提交的消息能被看到. 更具體一點, READ_COMMITTED 返回比當前 LSO (last stable offset)小的offset, 並允許返回已經取消的事務
topics ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,timestamp:INT64})}) null 列表:topic,partitions{分區號,時間戳}

響應 version:2

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
responses ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,error_code:INT16,timestamp:INT64,offset:INT64})}) null 列表:topic 分區號 錯誤碼 時間戳 offset

FetchRequest

請求 version:6

name type defaultValue docString
replica_id INT32 null follower的broker的id. 正常消費用-1
max_wait_time INT32 null 等待響應的最大時間 單位ms.
min_bytes INT32 null 最小字節
max_bytes INT32 null 最大字節. 單條消息如果超過這個大小也將返回
isolation_level INT8 null 事務隔離級別
topics ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,fetch_offset:INT64,log_start_offset:INT64,max_bytes:INT32})}) null 列表: topic 分區號 取的offset log開始的 offset?? 最大字節.

響應 version:6

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
responses ARRAY({topic:STRING,partition_responses:ARRAY({partition_header:{partition:INT32,error_code:INT16,high_watermark:INT64,last_stable_offset:INT64,log_start_offset:INT64,aborted_transactions:ARRAY({producer_id:INT64,first_offset:INT64})},record_set:RECORDS})}) null 列表: topic 分區頭: 分區號 高水位值 LSO(上次穩定offset), log開始offset,取消事務:生產者id 第一個offset。 消息記錄集

HeartbeatRequest

請求 version:1

name type defaultValue docString
group_id STRING null group id
generation_id INT32 null group的年代
member_id STRING null 第一次加入的時候組協調者給分的成員id

響應 version:1

name type defaultValue docString
throttle_time_ms INT32 0 Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota)
error_code INT16 null 響應碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM