微軟雲消息隊列 Azure service bus queue


--更新:Bug 修復

The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue

  1. 消息dequeue時增加auto complete

    public static async Task MessageDequeueAsync()
    {
    // Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
    var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
    {
    // Maximum number of Concurrent calls to the callback ProcessMessagesAsync, set to 1 for simplicity.
    // Set it according to how many messages the application wants to process in parallel.
    MaxConcurrentCalls = 1,

             // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
             // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
             AutoComplete = false
         };
    
         // Register the queue message handler and receive messages in a loop
         queueClient.RegisterMessageHandler(
         async (message, token) =>
         {
             // Process the message
             await PostMessageToBot(message);
             loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
             // Complete the message so that it is not received again.
             // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
             await queueClient.CompleteAsync(message.SystemProperties.LockToken);
         },
         //async (exceptionEvent) =>
         //{
         //    // Process the exception
         //    loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.Exception.Message}");
         //}
         messageHandlerOptions);
     }
    

2.在Azure上設置duplication detect時間,由1~59s,設置為55s,大筆試lock duration 時間長,可供消費的時間。

前言

第一次使用消息隊列,遇到了一些問題:同一個消息有多次出列。是一個消息只入列一次,還是多次?還是因為出列問題,出列了多次?

Microsoft Azure service bus queue

Azure service bus queue在Azure上創建一個service bus,在service bus 上創建一個 queue,創建的時候注意 enable duplicate detected message這個選項選上,防止消息重復入列。

找到SAS Policy: RootManageSharedAccessKey 的值,復制下來,用作connectstring。

Azure service bus sample on github

配置servicebus 的queue

namespace:Microsoft.Azure.ServiceBus;

初始化queueClient:

private static readonly Serilog.ILogger loggerCore = LoggerCoreFactory.GetLoggerCore();
const string ServiceBusConnectionString = "Endpoint=sb://{YOUR-NAMESPACE-ON-Azure}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR-KEYS}";
const string QueueName = "{YOUR-QUEUE-NAME}";
static IQueueClient queueClient;

static MessageQueueHandler()
{
    queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
}

消息入列

這里添加messageID 作為入列依據,防止同一消息多次入列,然后不用頻繁的關閉消息連接,影響性能。

public static async Task MessageEnqueueAsync(string message)
{
    // Send messages.
    await SendMessagesAsync(message);
    // don't close frequently 
    //await queueClient.CloseAsync();
}

static async Task SendMessagesAsync(string messageXML)
{
    try
    {
        // Create a new message to send to the queue.
        var messageStr = ParseMessageType.Parse(messageXML);
        var msgId = messageStr.Body.MsgId.Value;
        var message = new Microsoft.Azure.ServiceBus.Message
        {
            MessageId = msgId,// avoid same message enqueue more than once
            Body = Encoding.UTF8.GetBytes(messageXML),
        };
        loggerCore.Information($"message enqueue:{messageXML}");
        // Send the message to the queue.
        await queueClient.SendAsync(message);
    }
    catch (Exception exception)
    {
        loggerCore.Error($"message enqueue error:{exception.ToString()}");
    }
}

出列

public static async Task MessageDequeueAsync()
{
    // please choice PeekLock mode
    queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);

    // Register the queue message handler and receive messages in a loop
    queueClient.RegisterMessageHandler(
    async (message, token) =>
    {
        // Process the message
        await PostMessageToBot(message);
        loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
        // Complete the message so that it is not received again.
        // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
        await queueClient.CompleteAsync(message.SystemProperties.LockToken);
    },
    async (exceptionEvent) =>
    {
        // Process the exception
        loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.ToString()}");
    });
}

這樣消息就會根據入列的先后,逐次出列。

后記

幾個有幫助的鏈接分享一下:
1.Read best practice. 就是如何最好的使用Azure queue,避免不需要的開銷。
2.Enable queue duplicate detection 啟用消息重復檢測項,防止同一消息多次額enqueue。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM