【Azure 事件中心】在微軟雲中國區 (Mooncake) 上實驗以Apache Kafka協議方式發送/接受Event Hubs消息 (Java版)


問題描述

事件中心提供 Kafka 終結點,現有的基於 Kafka 的應用程序可將該終結點用作運行你自己的 Kafka 群集的替代方案。 事件中心可與許多現有 Kafka 應用程序配合使用。在Azure官方提供的Demo中,都是針對Global Azure。以下內容通過實驗來一步一步調試並在Azure中國區連接Event Hub成功。

 

操作步驟

  • 准備好Event Hub的連接字符串,可以是Namespace級別的SAS connection string,也可以是Event Hub Instance(Topic)級的連接字符串
    • Event Hub Namespace 級別的連接字符串格式為:Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX
    • Event Hub Instance 級別的連接字符串格式為:Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX;EntityPath=XXXXXX
    • 注:如果您使用的是Global Azure,Event Hub的域名地址為 *.servicebus.windows.net.

在Demo代碼中,有兩部分代碼發送消息的Producer和消費消息的Consumer. 他們的文件結構如下:

  • Producer:使用Demo中的Producer項目代碼,發送消息到事件中心,如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依賴的版本信息。
  • Consumer:使用Demo中的Consumer項目代碼,從開啟Kafka終結點的事件中心接受消息。如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依賴的版本信息。

 

第一步:修改發送端的Kafka連接字符串和TOPIC名稱

在producer.config文件中修改bootstrap.servers 和 sasl.jaas.config 的值。使用事件中心的kafka終結點(Event Hubs Kafka endpoint)。

bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";

同時也在TestProducer.java文件中修改TOPIC值。如這次測試中使用的是testmessage

 

第二步:修改消費端的Kafka連接字符串和TOPIC名稱

在consumer.config文件中修改bootstrap.servers 和 sasl.jaas.config 的值。使用事件中心的kafka終結點(Event Hubs Kafka endpoint)。

bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";

同時也在TestConsumer.java文件中修改TOPIC值。如這次測試中使用的是testmessage

 

第三步:調試發送端和消費端代碼

在VS Code中直接調試代碼,點擊F5啟動或者在Mian方法之上點擊run or debug linkbutton。測試效果如:

 

錯誤一:Invalid SASL mechanism response, server may be expecting a different protocol   /   Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative

在開始Debug Producer代碼時,出現了無效的SASL響應,Event Hub服務端不支持當前使用的kafka協議錯誤 (Invalid SASL mechanism response, server may be expecting a different protocol)。原來是由於使用的Event Hub定價層為基本層,而Azure支持Apache Kafka協議是在標准版和專用版。所以回到Azure Event Hub的定價層頁面,升級到標准版后就可以成功連接到事件中心(Event Hub)。

This error occurs when publishing to a basic plan Event Hub, as the basic plan does not support interaction via Kafka protocol.An upgrade to a standard plan should resolve this. https://azure.microsoft.com/en-au/pricing/details/event-hubs/

 

參考資料

Send and Receive Messages in Java using Azure Event Hubs for Apache Kafka Ecosystems: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java

針對 Azure 事件中心的 Apache Kafka 開發人員指南: https://docs.azure.cn/zh-cn/event-hubs/apache-kafka-developer-guide

Not able connect to EventHub via KAFKA api: https://stackoverflow.com/questions/59891094/not-able-connect-to-eventhub-via-kafka-api

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM