【事件中心 Azure Event Hub】使用Logstash消費EventHub中的event時遇見的幾種異常(TimeoutException, ReceiverDisconnectedException)


問題描述

使用EFK(Elasticsearch, Fluentd and Kibana)在收集日志的解決方案中, 可以先把日志發送到EventHub中,然后通過Logstash消費EventHub中的事件並分發出去。但是在使用Logstash的過程中,遇見了連接不上EventHub的錯誤,和Receiver一直不停關閉的問題。 錯誤的信息分別為:

Exception while initializing stores, not starting partition manager com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub

        ... ...

Caused by: com.microsoft.azure.eventhubs.TimeoutException: Opening MessagingFactory timed out.

[WARN ][com.microsoft.azure.eventprocessorhost.PartitionPump][main][cbc2dac224225cd02511820a8ee314e73f1c0800809c9c534154188acb14fbac] host logstash-fe4f6e2e-e260-4522-a3f8-f292a8902dad: 3: Receiver disconnected on create, bad epoch?
com.microsoft.azure.eventhubs.ReceiverDisconnectedException: Receiver 'nil' with a higher epoch '637360547769896558' already exists. Receiver 'nil' with epoch 0 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. 

問題原因

  • 對於TimeoutException,需要判斷是否是當前環境連接不上EventHub服務器,所以可以通過排查網絡連接的方式來解決。(Link)

  •  對於ReceiverDisconnectedException,錯誤是Receiver在不停通過同一個消費組,同一個分區建立連接,當新連接建立時,會導致舊的連接關閉。所以需要檢查客戶端是不是又多個進程在建立連接或者時多個客戶端在消費同一個分區數據

具體的解釋可以參考:https://github.com/Azure/azure-event-hubs-spark/blob/master/FAQ.md

Why am I getting a ReceiverDisconnectedException?

In version 2.3.2 and above, the connector uses epoch receivers from the Event Hubs Java client. This only allows one receiver to be open per consumer group-partition combo. To be crystal clear, let's say we have receiverA with an epoch of 0 which is open within consumer group foo on partition 0. Now, if we open a new receiver, receiverB, for the same consumer group and partition with an epoch of 0 (or higher), then receiverA will be disconnected and get the ReceiverDisconnectedException.

In order to avoid this issue, please have one consumer group per Spark application being run. In general, you should have a unique consumer group for each consuming application being run.

Note that this error could happen if the same structured stream is accessed by multiple queries (writers).
Spark will read from the input source and process the dataframe separately for each defined sink. This results in having multiple readers on the same consumer group-partition combo. In order to prevent this, you can create a separate reader for each writer using a separate consumer group or use an intermediate delta table if you are using Databricks.

解決方案

對於TimeoutException問題,只要解決另外客戶端環境問題后,問題會得到解決。但是對於ReceiverDisconnectedException則如何解決呢? 由於都是在Logstash中配置,並沒有代碼可以修改。所以解決這個問題就是要設置Logstash的工作進程,不能讓進程數大於分區數。 並且為Logstash在EventHub中單獨建立一個消費組。以下是為一個成功通過Logstash消費EventHub的配置

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://xxxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=test;SharedAccessKey=xxxxxxxx=;EntityPath=logstest"]
      threads => 8
      decorate_events => true
     consumer_group => "logs"
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=xxxxxxx=;EndpointSuffix=core.chinacloudapi.cn"
   }
  } output { stdout {
        }
 }

啟動命令為:

./bin/logstash -f config/ehtest.conf -w 1

啟動后成功結果如:(成功捕獲到EventHub中進入的事件)

在獲取連接字符串的過程中,可以參考Logstash中關於EventHub插件的說明文檔:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-azure_event_hubs.html(如要了解全面的EventHub中的配置參數,也可以參考該文檔說明)

Event Hub connection string

The plugin uses the connection string to access Azure Events Hubs. Find the connection string here: Azure Portal-> Event Hub -> Shared access polices. The event_hub_connections option passes the Event Hub connection strings for the basic configuration.

 注:不要使用默認的消費組或者是與其他應用公用消費組,這樣會導致Logstash連接不上。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM