問題情形
使用Java SDK編寫的Event Hub消費端應用,隨機性遇見了某個分區沒有消費消息的情況,在檢查日志時候,有發現IdelTimeExpired的錯誤記錄。在重啟應用后,連接EventHub正常,並又能正常消費數據。比較懷疑的方面,在又開啟Retry機制的情況下,為什么分區(Partition)連接斷掉后沒有重連呢?
錯誤消息:
{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":"", "url":"", "clientIp":"",
"method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub",
"line":"EventHub.java:150",
"msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'.,
errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn,
PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]"}
消費端代碼:
eventProcessorClient = new EventProcessorClientBuilder() .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .connectionString(currentEventHubProperty.getConnectionString(), this.topic) .retry(retryOptions) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .processEvent(eventContext -> { String currentData = ""; try { EventData event = eventContext.getEventData(); PartitionContext partitionContext = eventContext.getPartitionContext(); EventMessage eventMessage = new EventMessage(); currentData = new String(event.getBody(), Charset.defaultCharset()); eventMessage.setContent(currentData); eventMessage.setPartitionId(partitionContext.getPartitionId()); eventMessage.setSequenceNumber(event.getSequenceNumber()); log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic, partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime()); eventContext.updateCheckpoint(); } catch (Exception e) { String msg = e.getMessage(); if (StringUtils.isBlank(msg)) { msg = e.getStackTrace().toString(); } log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData); } }) .processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage())) .buildEventProcessorClient();
分析原因
第一步,需要根據日志來判斷當前分區是否在問題時間點閑置了240秒,在此期間沒有數據進入該分區中,如日志中有關於每一天消息進入Queue的時間(enqueued time),則可以通過日志分析,如果沒有,這可以在代碼日志中添加:(這是為了下一次發生問題時候,可以直接在日志中分析)
log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic, partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());
而對於已經發生的問題,根據EventHub數據保留的設置,如果Event等信息還在保留時間期內,則可以通過SDK的receiveFromPartition方法來指定需要獲取的數據范圍,來查看其進入Queue的時間。(注:需要建一個不同的consumer group,不要用$Default,免得連不上),示例代碼:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/5.2.0/index.html
Consume events from an Event Hub partition
To consume events, create an
EventHubConsumerAsyncClient
orEventHubConsumerClient
for a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.Consume events with EventHubConsumerAsyncClient
In the snippet below, we create an asynchronous consumer that receives events from
partitionId
and only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the sameEventHubConsumerAsyncClient
by callingreceiveFromPartition(String, EventPosition)
with another partition id.EventHubConsumerAsyncClient consumer = new EventHubClientBuilder() .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>") .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .buildAsyncConsumerClient(); // Receive newly added events from partition with id "0". EventPosition specifies the position // within the Event Hub partition to begin consuming events. consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> { // Process each event as it arrives. }); // add sleep or System.in.read() to receive events before exiting the process.
Consume events with EventHubConsumerClient
Developers can create a synchronous consumer that returns events in batches using an
EventHubConsumerClient
. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.EventHubConsumerClient consumer = new EventHubClientBuilder() .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>") .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .buildConsumerClient(); String partitionId = "<< EVENT HUB PARTITION ID >>"; // Get the first 15 events in the stream, or as many events as can be received within 40 seconds. IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15, EventPosition.earliest(), Duration.ofSeconds(40)); for (PartitionEvent event : events) { System.out.println("Event: " + event.getData().getBodyAsString()); }
以上。 並沒有發現問題是否是應用端邏輯問題還是是SDK端問題,在借鑒了GitHub上的很多相類似的情況后,大部分傾向於Java SDK問題。需要等待Github中的進一步更新:
AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://github.com/Azure/azure-sdk-for-java/issues/11233