組里最近遇到一個問題,微軟的Azure Service Bus Queue是否可靠?是否會出現丟失消息的情況?
具體緣由如下,
由於開發的產品是SaaS產品,為防止消息丟失,跨Module消息傳遞使用的是微軟Azure消息隊列(Service Bus Queue),但是出現一個問題,一個Module向Queue里發送消息,但另一個Module沒有取到該消息。因為消息發送過程中並未有異常。所以大家懷疑,是否Azure Service Bus Queue不可靠,丟失了我們的一些消息?
官方的說法是,99.5%的概率消息不會丟失。
但我想應該沒有那么湊巧,畢竟我們的消息量還在測試階段,沒有那么大,不會那么湊巧碰上。所以索性根據同事的建議,寫一個測試程序來確定Service Bus Queue是否會或者容易丟失消息。
一. 測試程序簡介
原理:向消息隊列(Queue)中發送一定量的消息,看能否全部取到。如可全部取到,則可認為消息隊列基本可靠,問題出在我們自己身上。
過程:
首先建立一個消息隊列(Queue),程序使用Azure .Net SDK實現向Queue發送和接受消息(接收到消息后會調用方法在Queue中刪除此消息,刪除成功,則視為接收成功)。
主程序執行后,會啟動兩個線程,
線程1負責不斷向Queue中發送消息(總量一定,假定共發送10000條,由於SDK中Send方法無返回值告知是否發送成功,如果發送過程中無異常拋出,則視為成功發送)。
線程2負責不斷地從Queue中取消息,取到消息到本地后,即刪除在Queue中的此消息。取到消息並成功刪除視為成功取到消息,計數器+1。
日志模塊:
使用Log4net記錄日志
二. 代碼實現
Class ServiceBusQueueHandler負責封裝.Net SDK的發送,接收消息。
class ServiceBusQueueHandler { public static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); public ServiceBusQueueHandler() { /* For most scenarios, it is recommended that you keep Mode to Auto. * This indicates that your application will attempt to use TCP to connect to the Windows Azure Service Bus, * but will use HTTP if unable to do so. In general, this allows your connection to be more efficient. * However, if TCP is always unavailable to your application, * you can save some time on your connection if you globally set the mode to HTTP.*/ ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect; } //Send Message public bool SendMessage(string strMessageBody, QueueClient client, int idelayTime = 0) { //log.Debug("=>SendMessage"); bool bRet = false; try { BrokeredMessage message = new BrokeredMessage(strMessageBody); DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime); //log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString())); //log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString())); //set the time when this message will be visiable message.ScheduledEnqueueTimeUtc = utcEnqueueTime; //http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.send.aspx client.Send(message); log.Debug(string.Format("Success send! Send Time = {0}, Body = {1}", DateTime.UtcNow.ToString(), message.GetBody<string>())); bRet = true; } catch (TimeoutException e) { //Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low. log.Debug(string.Format("TimeoutException: {0}", e.Message)); return bRet; } catch (ArgumentException e) { //Thrown when the BrokeredMessage is null. log.Debug(string.Format("ArgumentException: {0}", e.Message)); return bRet; } catch (InvalidOperationException e) { //Thrown if the message has already been sent by a QueueClient or MessageSender once already. log.Debug(string.Format("InvalidOperationException: {0}", e.Message)); return bRet; } catch (OperationCanceledException e) { //Thrown if the client entity has been closed or aborted. log.Debug(string.Format("OperationCanceledException: {0}", e.Message)); return bRet; } catch (UnauthorizedAccessException e) { //Thrown if there is an I/O or security error. log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message)); return bRet; } catch (SerializationException e) { //Thrown when an error occurs during serialization or deserialization. log.Debug(string.Format("SerializationException: {0}", e.Message)); return bRet; } catch (MessagingEntityNotFoundException e) { //Thrown if the queue does not exist. log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message)); return bRet; } catch (MessagingException e) { log.Debug(string.Format("MessagingException: {0}", e.Message)); if (e.IsTransient) { //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e); } return bRet; } catch (Exception e) { log.Debug(string.Format("Exception: {0}", e.Message)); return bRet; } //log.Debug("<=SendMessage"); return bRet; } //SendMessages, the maximum size of the batch is the same as the maximum size of a single message (currently 256 Kb). public bool SendMessages(List<string> arrayMessages, QueueClient client, int idelayTime = 0) { //log.Debug("=>SendMessage"); bool bRet = false; int i = 0; //prepare data List<BrokeredMessage> arrayBrokedMessages = new List<BrokeredMessage>(); DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime); log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString())); log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString())); foreach (string strMessageBody in arrayMessages) { BrokeredMessage message = new BrokeredMessage(strMessageBody); // The Id of message must be assigned message.MessageId = "Message_" + (++i).ToString(); message.ScheduledEnqueueTimeUtc = utcEnqueueTime; arrayBrokedMessages.Add(message); } //send messages try { client.SendBatch(arrayBrokedMessages); log.Debug(string.Format("Success send batch messages!")); bRet = true; } catch (TimeoutException e) { //Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low. log.Debug(string.Format("TimeoutException: {0}", e.Message)); return bRet; } catch (ArgumentException e) { //Thrown when the BrokeredMessage is null. log.Debug(string.Format("ArgumentException: {0}", e.Message)); return bRet; } catch (InvalidOperationException e) { //Thrown if the message has already been sent by a QueueClient or MessageSender once already. log.Debug(string.Format("InvalidOperationException: {0}", e.Message)); return bRet; } catch (OperationCanceledException e) { //Thrown if the client entity has been closed or aborted. log.Debug(string.Format("OperationCanceledException: {0}", e.Message)); return bRet; } catch (UnauthorizedAccessException e) { //Thrown if there is an I/O or security error. log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message)); return bRet; } catch (SerializationException e) { //Thrown when an error occurs during serialization or deserialization. log.Debug(string.Format("SerializationException: {0}", e.Message)); return bRet; } catch (MessagingEntityNotFoundException e) { //Thrown if the queue does not exist. log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message)); return bRet; } catch (MessagingException e) { log.Debug(string.Format("MessagingException: {0}", e.Message)); if (e.IsTransient) { //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e); } return bRet; } catch (Exception e) { log.Debug(string.Format("Exception: {0}", e.Message)); return bRet; } log.Debug("<=SendMessage"); return bRet; } //get messages from a queue //iWaitTimeout: The time span that the server will wait for the message batch to arrive before it times out. public List<BrokeredMessage> GetMessages(int iMaxNumMsg, int iWaitTimeout, QueueClient client) { //log.Debug("=>ReceiveMessages"); List<BrokeredMessage> list = new List<BrokeredMessage>(); try { //receive messages from Agent Subscription list = client.ReceiveBatch(iMaxNumMsg, TimeSpan.FromSeconds(iWaitTimeout)).ToList<BrokeredMessage>(); } catch (MessagingException e) { log.Debug(string.Format("ReceiveMessages, MessagingException: {0}", e.Message)); if (e.IsTransient) { //e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e); } return null; } catch (Exception e) { log.Debug(string.Format("ReceiveMessages, Exception: {0}", e.Message)); return null; } //subClient.Close(); //log.Debug("<=ReceiveMessages"); return list; } public bool DeleteMessage(BrokeredMessage message) { //log.Debug("=>DeleteMessage"); bool bRet = false; try { message.Complete(); bRet = true; log.Debug(string.Format("Delete Message Successfully")); } catch (Exception e) { log.Debug(e.Message); return bRet; } //log.Debug("<=DeleteMessage"); return bRet; } private void HandleTransientErrors(MessagingException e) { //If transient error/exception, let's back-off for 2 seconds and retry log.Debug(e.Message); log.Debug("Transient error happened! Will retry in 2 seconds"); Thread.Sleep(2000); } }
Main方法以及線程1,線程2的實現。
//this function is used to send a number of messages to a queue public static void SendMessageToQueue() { int sendMessageNum = 10000; log.Debug(string.Format("=> SendMessageToQueue, send message number = {0}", sendMessageNum)); //prepare the handler, client ServiceBusQueueHandler handler = new ServiceBusQueueHandler(); QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName); //the message num which is sent successfully int count = 0; for (int i = 0; i < sendMessageNum; i++) { //send a message string strMessageBody = System.Guid.NewGuid().ToString(); bool bRet = handler.SendMessage(strMessageBody, client, 10); if (bRet) { count++; } //wait 2s, then send next message Thread.Sleep(2000); } log.Debug(string.Format("<= SendMessageToQueue, success sent message number = {0}", count)); } public static void ReceiveMessageFromQueue() { log.Debug("=> ReceiveMessageFromQueue"); //prepare the handler, client ServiceBusQueueHandler handler = new ServiceBusQueueHandler(); QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName); //the message num which is received successfully int count = 0; //if we can't get message in 1 hour(60 * 60 = 30 * 120), we think there are no more messages in the queue int failCount = 0; while (failCount < 30) { List<BrokeredMessage> list = handler.GetMessages(10, 120, client); if (list.Count > 0) { foreach (BrokeredMessage e in list) { log.Debug(string.Format("Received 1 Message, Time = {0}, Message Body = {1}", DateTime.UtcNow.ToString(), e.GetBody<string>())); //delete message bool bRet = handler.DeleteMessage(e); if (bRet) { count++; } } log.Debug(string.Format("Current Count Number = {0}", count)); } else { failCount++; log.Debug(string.Format("Didn't Receive any Message this time, fail count number = {0}", failCount)); } //wait 10s, then send next message Thread.Sleep(1000); } log.Debug(string.Format("<= ReceiveMessageFromQueue, success received message number = {0}", count)); } static void Main(string[] args) { log4net.GlobalContext.Properties["LogName"] = "TestServiceBus.log"; log4net.Config.XmlConfigurator.Configure(); Console.WriteLine("Start"); Thread threadSendMessage = new Thread(SendMessageToQueue); Thread threadReceMessage = new Thread(ReceiveMessageFromQueue); threadSendMessage.Start(); threadReceMessage.Start(); //Console.WriteLine("Stop"); Console.ReadLine(); }
當然,這里有一個小地方,因為線程1只會發送10000條消息,線程2一直在接收,但當一個小時內沒有接收到消息時,則可認為隊列中不會再有消息,則停止接收。
三. 測試結果
從Log來看,程序跑了將近8個小時,最后結果如下:
成功發送10000條消息
2015-04-30 15:01:49,576 [3] DEBUG TestServiceBus.Program <= SendMessageToQueue, success sent message number = 10000
成功接收10000條消息
2015-04-30 15:02:03,638 [4] DEBUG TestServiceBus.Program Current Count Number = 10000
所以僅從此次測試結果來看,Service Bus Queue並未丟失消息。所以組里遇到消息的問題,建議還是從自己代碼入手檢查問題,是否我們自己出了問題,而非Service Bus Queue。
---------------------------------------------------------------
2015年5月5日更新:最終找到Service Bus丟失消息的原因,問題果然出在我們自己這邊,發消息時,message id有重復的可能,導致可能會丟信。message id應唯一。
拋磚引玉,謝謝:-)
Kevin Song
2015年5月2日