問題描述
事件中心提供 Kafka 終結點,現有的基於 Kafka 的應用程序可將該終結點用作運行你自己的 Kafka 群集的替代方案。 事件中心可與許多現有 Kafka 應用程序配合使用。在Azure官方提供的Demo中,都是針對Global Azure。以下內容通過實驗來一步一步調試並在Azure中國區連接Event Hub成功。
操作步驟
- 准備好Event Hub的連接字符串,可以是Namespace級別的SAS connection string,也可以是Event Hub Instance(Topic)級的連接字符串
- Event Hub Namespace 級別的連接字符串格式為:
Endpoint=sb://
mynamespace.servicebus.chinacloudapi.cn
/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX
- Event Hub Instance 級別的連接字符串格式為:
Endpoint=sb://
mynamespace.servicebus.chinacloudapi.cn
/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX
;EntityPath=XXXXXX - 注:如果您使用的是Global Azure,Event Hub的域名地址為 *
.servicebus.windows.net
.
- 從Github中下載Java版本Demo代碼:https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java
-
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git cd azure-event-hubs-for-kafka/quickstart/java
-
在Demo代碼中,有兩部分代碼發送消息的Producer和消費消息的Consumer. 他們的文件結構如下:
- Producer:使用Demo中的Producer項目代碼,發送消息到事件中心,如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依賴的版本信息。
- Consumer:使用Demo中的Consumer項目代碼,從開啟Kafka終結點的事件中心接受消息。如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依賴的版本信息。
第一步:修改發送端的Kafka連接字符串和TOPIC名稱
在producer.config文件中修改bootstrap.servers
和 sasl.jaas.config 的值。使用事件中心的kafka終結點(Event Hubs Kafka endpoint)。
bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";
同時也在TestProducer.java文件中修改TOPIC值。如這次測試中使用的是testmessage
第二步:修改消費端的Kafka連接字符串和TOPIC名稱
在consumer.config文件中修改bootstrap.servers
和 sasl.jaas.config 的值。使用事件中心的kafka終結點(Event Hubs Kafka endpoint)。
bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093 group.id=$Default request.timeout.ms=60000 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";
同時也在TestConsumer.java文件中修改TOPIC值。如這次測試中使用的是testmessage
第三步:調試發送端和消費端代碼
在VS Code中直接調試代碼,點擊F5啟動或者在Mian方法之上點擊run or debug linkbutton。測試效果如:
錯誤一:Invalid SASL mechanism response, server may be expecting a different protocol / Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative
在開始Debug Producer代碼時,出現了無效的SASL響應,Event Hub服務端不支持當前使用的kafka協議錯誤 (Invalid SASL mechanism response, server may be expecting a different protocol)。原來是由於使用的Event Hub定價層為基本層,而Azure支持Apache Kafka協議是在標准版和專用版。所以回到Azure Event Hub的定價層頁面,升級到標准版后就可以成功連接到事件中心(Event Hub)。
This error occurs when publishing to a basic plan Event Hub, as the basic plan does not support interaction via Kafka protocol.An upgrade to a standard plan should resolve this. https://azure.microsoft.com/en-au/pricing/details/event-hubs/
參考資料
Send and Receive Messages in Java using Azure Event Hubs for Apache Kafka Ecosystems: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java
針對 Azure 事件中心的 Apache Kafka 開發人員指南: https://docs.azure.cn/zh-cn/event-hubs/apache-kafka-developer-guide
Not able connect to EventHub via KAFKA api: https://stackoverflow.com/questions/59891094/not-able-connect-to-eventhub-via-kafka-api