20140310補充:
rabbitmq有requeue屬性,可以選擇消息是否返回隊列,另,本文的解決方式非常之山寨,只能應用於發送和接收方式。
這幾天在折騰消息隊列,在.Net環境下有基於RabbitMQ有很多有API的選擇,最后選擇了比較簡單的EasyNetQ(http://easynetq.com/)
在測試使用的時候發現一個問題,對於處理錯誤的消息,EasyNetQ默認會放到一個錯誤隊列中並提供了一個工具可以對錯誤隊列的消息進行重新分發。RabiitMQ是有一個事務功能的,但是貌似在EasyNetQ的實現中,沒有發現相關的操作方式
現在的需求是,在某種特定的情況下,需要將處理不成功的消息,保留在消息原隊列
做了一個變通處理,出現錯誤消息的時候,將其再送回原隊列。從客戶端上可以直接send回隊列,但這樣不是一個很好的方式
研究了一下EasyNetQ的相關代碼,發現其定義了一個IConsumerErrorStrategy接口,我們只要自己自行實現,並注冊一個自定義方法就可以
ComponentRegistration.cs原注冊方法相關,默認錯誤消息處理方式在DefaultConsumerErrorStrategy.cs中
1 public class ComponentRegistration 2 { 3 public static void RegisterServices(IContainer container) 4 { 5 Preconditions.CheckNotNull(container, "container"); 6 7 // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run. 8 #region DefaultConsumerErrorStrategy 9 container 10 .Register(_ => container) 11 .Register<IMessageQueueQLogger, ConsoleLogger>() 12 .Register<ISerializer, JsonSerializerN>() 13 .Register<IConventions, Conventions>() 14 .Register<IEventBus, EventBus>() 15 .Register<ITypeNameSerializer, TypeNameSerializer>() 16 .Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId) 17 .Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>() 18 .Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>() 19 .Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>() 20 .Register<IPublisherConfirms, PublisherConfirms>() 21 .Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>() 22 .Register<IHandlerRunner, HandlerRunner>() 23 .Register<IInternalConsumerFactory, InternalConsumerFactory>() 24 .Register<IConsumerFactory, ConsumerFactory>() 25 .Register<IConnectionFactory, ConnectionFactoryWrapper>() 26 .Register<IPersistentChannelFactory, PersistentChannelFactory>() 27 .Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>() 28 .Register<IHandlerCollectionFactory, HandlerCollectionFactory>() 29 .Register<IAdvancedBus, RabbitAdvancedBus>() 30 .Register<IRpc, Rpc>() 31 .Register<ISendReceive, SendReceive>() 32 .Register<IBus, RabbitBus>(); 33 #endregion 34 } 35 }
對DefaultConsumerErrorStrategy的代碼進行研究后發現,EasyNetQ是產生一個處理錯誤消息隊列的Exchanges,將消息二次封裝后推送到默認的錯誤隊列
只要將消息隊列改為源消息隊列,取消消息二次封裝,直接推送到隊列
隊列綁定
1 private string DeclareErrorExchangeAndBindToDefaultErrorQueue(IModel model, ConsumerExecutionContext context) 2 { 3 var originalRoutingKey = context.Info.RoutingKey; 4 5 return errorExchanges.GetOrAdd(originalRoutingKey, _ => 6 { 7 var exchangeName = conventions.ErrorExchangeNamingConvention(context.Info); 8 model.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true); 9 //更改第一個參數 10 model.QueueBind(originalRoutingKey, exchangeName, originalRoutingKey); 11 return exchangeName; 12 }); 13 }
消息處理
1 public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 2 { 3 Preconditions.CheckNotNull(context, "context"); 4 Preconditions.CheckNotNull(exception, "exception"); 5 6 try 7 { 8 Connect(); 9 10 using (var model = connection.CreateModel()) 11 { 12 var errorExchange = DeclareErrorExchangeQueueStructure(model, context); 13 var messageBody = context.Body; 14 var properties = model.CreateBasicProperties(); 15 context.Properties.CopyTo(properties); 16 properties.Type = context.Properties.Type; 17 //消息持久化 18 properties.SetPersistent(true); 19 20 model.BasicPublish(errorExchange, context.Info.RoutingKey, properties, messageBody); 21 22 } 23 } 24 catch (Exception unexpectedException) 25 { 26 // Something else unexpected has gone wrong :( 27 logger.ErrorWrite("EasyNetQMessageQueue Consumer Error Handler: Failed to publish error message\nException is:\n" 28 + unexpectedException); 29 } 30 return Consumer.PostExceptionAckStrategy.ShouldAck; 31 }
因為EasyNetQ在CreateBus的時候就會將相關事件注冊,所以我們只需最后自行在config中加入有關配置,在注冊事件的位置加入判斷即可讓異常拋回
由於剛接觸RabbitMQ,對於一些概念的理解可能還不是非常到位,這是我目前所能找到的解決方案
當然可能以后的版本中,EasyNetQ會加入對事務的操作,現在EasyNetQ支持了一種叫Publisher Confirms的方法,貌似還是不是我想要的
