flink引出的kafka不同版本的兼容性


參考:

官網協議介紹:http://kafka.apache.org/protocol.html#The_Messages_Fetch

kafka協議兼容性  http://www.cnblogs.com/huxi2b/p/6784795.html 

 最近在使用flink的時候,在flink的官網對flink-connect-kafka有這樣的一個版本對照:

Maven Dependency Supported since Consumer and 
Producer Class name
Kafka version Notes
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x Uses the new Consumer API Kafka.
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x This connector supports Kafka messages with timestamps both for producing and consuming.
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x

Since 0.11.x Kafka does not support scala 2.10. This connector

supports Kafka transactional messaging to provide exactly once semantic for the producer.

flink-connector-kafka_2.11 1.7.0 FlinkKafkaConsumer
FlinkKafkaProducer
>= 1.0.0

This universal Kafka connector attempts to track the latest version of the Kafka client.

The version of the client it uses may change between Flink releases.

Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.

However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated

flink-connector-kafka-0.11_2.11 and flink-connector-kafka-0.10_2.11 respectively.

Attention: as of Flink 1.7 the universal Kafka connector is considered to be in a BETA 
status and might not be as stable as the 0.11 connector. In case of problems with
the universal connector, you can try to use flink-connector-kafka-0.11_2.11 which s
hould be compatible with all of the Kafka versions starting from 0.11.

就是每個版本的flink,對應的flink-connector-kafka(010 是kafka版本,2.11 是scala版本,這里忽略scala版本) 的版本,和connector 對應的kafka 版本。

最后一行的意思是,flin 1.7.0 對應 flink-connector-kafka,然后對應的kafka 版本是 >= 1.0.0。

這樣有個問題是:我們使用flink 1.7.2 ,connector 是 flink-connector-kafka_2.11,maven 下 connector 自動就拉了 kafka-client-2.0.1 下來,但是我們的kafka版本是1.0.0,所以就想用kafka-clents-1.0.0,高版本的當然也是可以的,不過總是覺得對應版本最好了(就像,小孩子,穿自己的衣服和穿他爸爸的衣服一樣)。

就對connector處理一下,排除了kafka-clients,重新引用了一個kafka-clents-1.0.0 的進來。如下:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clents</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clents</artifactId>
            <version>1.0.0</version>
        </dependency>

這樣一個問題,就這么機智的解決了。

-----------------分割線------------------------

 以為這樣就結束的看官,可能比較年輕,問題是永遠解決不完的。

新的問題是這樣的,我們需要讀其他的kafka,版本是0.10.x,同時使用兩個不同版本的kafka, WTF???

回憶一下整個事件:flink 1.7.2 對應的kafka connector 的kafka 默認版本是2.0.1,我們換成了 1.0.0,同樣去讀kafka 0.10.x,也是可以的(實測可以),這樣一來,把kafka-clents 版本換了1.0.0 就沒有太大的意義了(穿他爸的衣服和穿他哥的衣服,都差不多了),所以我們把之前的改動去掉了。

然后,這個事情看起來就很奇怪了:

  高版本的kafka-clent 可以讀寫 低版本的broker(2.0.1 -> 1.0.0)

  低版本的kafka-clent 可以讀寫 高版本的broker(0.10.x -> 1.0.0)

不得不說,kafka這么通用,唯一的理由,就是“好用

這個時候,看到這個博客: Kafka協議兼容性改進,里面有說到點版本的問題: 0.10.2.0 版本的broker 支持的 Fetch(1)的 版本是: 0 to 3 【usable: 3 】

 

kafka client 和broker 的交互都是http協議(為什么是http可以看這里:Some Common Philosophical Questions (一些哲學問題)),kafka的每個api,都有個api key

The following are the numeric codes that the ApiKey in the request can take for each of the below request types.

NAME KEY
Produce 0
Fetch 1
ListOffsets 2
Metadata 3
LeaderAndIsr 4
StopReplica 5
UpdateMetadata 6
ControlledShutdown 7
OffsetCommit 8
OffsetFetch 9
FindCoordinator 10
JoinGroup 11
Heartbeat 12
LeaveGroup 13
SyncGroup 14
DescribeGroups 15
ListGroups 16
SaslHandshake 17
ApiVersions 18
CreateTopics 19
DeleteTopics 20
DeleteRecords 21
InitProducerId 22
OffsetForLeaderEpoch 23
AddPartitionsToTxn 24
AddOffsetsToTxn 25
EndTxn 26
WriteTxnMarkers 27
TxnOffsetCommit 28
DescribeAcls 29
CreateAcls 30
DeleteAcls 31
DescribeConfigs 32
AlterConfigs 33
AlterReplicaLogDirs 34
DescribeLogDirs 35
SaslAuthenticate 36
CreatePartitions 37
CreateDelegationToken 38
RenewDelegationToken 39
ExpireDelegationToken 40
DescribeDelegationToken 41
DeleteGroups 42
ElectPreferredLeaders 43

其中 fetch 的key 是: 1

fetch 請求和返回都是有固定格式的(不然也不認識),這就是kafka 內部的fetch 協議,不同的格式,就是不同的版本,以 fetch request (有請求協議,就有響應協議 )v0 和v1 舉例

Fetch API (Key: 1):
Requests:
Fetch Request (Version: 0) => replica_id max_wait_time min_bytes [topics] 
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  topics => topic [partitions] 
    topic => STRING
    partitions => partition fetch_offset partition_max_bytes 
      partition => INT32
      fetch_offset => INT64
      partition_max_bytes => INT32
Fetch Request (Version: 1) => replica_id max_wait_time min_bytes [topics] replica_id => INT32 max_wait_time => INT32 min_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset partition_max_bytes partition => INT32 fetch_offset => INT64 partition_max_bytes => INT32 Responses: Fetch Response (Version: 0) => [responses] responses => topic [partition_responses] topic => STRING partition_responses => partition_header record_set partition_header => partition error_code high_watermark partition => INT32 error_code => INT16 high_watermark => INT64 record_set => RECORDS Fetch Response (Version: 1) => throttle_time_ms [responses] throttle_time_ms => INT32 responses => topic [partition_responses] topic => STRING partition_responses => partition_header record_set partition_header => partition error_code high_watermark partition => INT32 error_code => INT16 high_watermark => INT64 record_set => RECORDS

注: 不要找了,我也沒找到reques 0 和 1 有什么區別。

  更多協議詳情 查看官網介紹

執行  kafka-broker-api-versions.sh 查看服務端的版本:

./bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 (我的kafka版本是:kafka_2.11-2.2.0)
localhost:9092 (id: 0 rack: null) -> (
    Produce(0): 0 to 7 [usable: 7],
    Fetch(1): 0 to 10 [usable: 10],
    ListOffsets(2): 0 to 5 [usable: 5],
    Metadata(3): 0 to 7 [usable: 7],
    LeaderAndIsr(4): 0 to 2 [usable: 2],
    StopReplica(5): 0 to 1 [usable: 1],
    UpdateMetadata(6): 0 to 5 [usable: 5],
    ControlledShutdown(7): 0 to 2 [usable: 2],
    OffsetCommit(8): 0 to 6 [usable: 6],
    OffsetFetch(9): 0 to 5 [usable: 5],
    FindCoordinator(10): 0 to 2 [usable: 2],
    JoinGroup(11): 0 to 4 [usable: 4],
    Heartbeat(12): 0 to 2 [usable: 2],
    LeaveGroup(13): 0 to 2 [usable: 2],
    SyncGroup(14): 0 to 2 [usable: 2],
    DescribeGroups(15): 0 to 2 [usable: 2],
    ListGroups(16): 0 to 2 [usable: 2],
    SaslHandshake(17): 0 to 1 [usable: 1],
    ApiVersions(18): 0 to 2 [usable: 2],
    CreateTopics(19): 0 to 3 [usable: 3],
    DeleteTopics(20): 0 to 3 [usable: 3],
    DeleteRecords(21): 0 to 1 [usable: 1],
    InitProducerId(22): 0 to 1 [usable: 1],
    OffsetForLeaderEpoch(23): 0 to 2 [usable: 2],
    AddPartitionsToTxn(24): 0 to 1 [usable: 1],
    AddOffsetsToTxn(25): 0 to 1 [usable: 1],
    EndTxn(26): 0 to 1 [usable: 1],
    WriteTxnMarkers(27): 0 [usable: 0],
    TxnOffsetCommit(28): 0 to 2 [usable: 2],
    DescribeAcls(29): 0 to 1 [usable: 1],
    CreateAcls(30): 0 to 1 [usable: 1],
    DeleteAcls(31): 0 to 1 [usable: 1],
    DescribeConfigs(32): 0 to 2 [usable: 2],
    AlterConfigs(33): 0 to 1 [usable: 1],
    AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
    DescribeLogDirs(35): 0 to 1 [usable: 1],
    SaslAuthenticate(36): 0 to 1 [usable: 1],
    CreatePartitions(37): 0 to 1 [usable: 1],
    CreateDelegationToken(38): 0 to 1 [usable: 1],
    RenewDelegationToken(39): 0 to 1 [usable: 1],
    ExpireDelegationToken(40): 0 to 1 [usable: 1],
    DescribeDelegationToken(41): 0 to 1 [usable: 1],
    DeleteGroups(42): 0 to 1 [usable: 1],
    ElectPreferredLeaders(43): 0 [usable: 0]
)

 

 可以看到 Fetch(1): 0 to 10 [usable: 10] , 支持的 fetch版本是 0 到 10(最新版本)(請求和響應協議是一一對應的).

 服務端的協議版本找到了,就該找客戶端的協議版本了。

直接打開kafka-clents 的源碼:

進入 方法: FetchRequest.schemaVersions()

public static Schema[] schemaVersions() {
        return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
            FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8};
    };
FetchResponse.schemaVersions()
public static Schema[] schemaVersions() {
        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8};
    }

可以看到,kafka-clients 2.0.1(flink 1.7.2 默認的 kafka client 版本) 支持的 fetch reques 和 response 的協議版本 是 0 - 8.

kafka 0.10.x 支持的版本是 0 - 3

kafka 1.0.0 支持的版本是: 0 - 6

更多請參考官網:http://kafka.apache.org/protocol.html#The_Messages_Fetch

總結: kafka 做的真的是好,服務的和客戶端的各個版本的完全兼容性非常好,新版本支持舊版本,舊版本也可以和新版一起使用,只有支持的協議版本又交叉


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM