16-EasyNetQ之自動訂閱者


EasyNetQ v0.7.1.30版本有了一個簡單AutoSubscriber。你能夠使用它很容易的去掃描指定程序集中實現了IConsume 或 IConsumeAsync 接口的類,然后這個自動訂閱者讓這些消費者訂閱到你的事件總線中。IConsume 的實現將使用事件總線的Subscribe方法,同時IConsumeAsync 的實現使用事件總線的SubscribeAsync方法,詳情參看 EasyNetQ之訂閱。你當然能夠讓你的消費者處理多個消息。讓我們看一些示例。

注意:從0.13.0版本開始,所有的AutoSubscriber類都在EasyNetQ.AutoSubscribe命名空間中。因此請添加下面的using語句。

using EasyNetQ.AutoSubscribe;

讓我們定義一個簡單消費者,處理三個消息:MessageA,MessageB和MessageC。

public class MyConsumer : IConsume<MessageA>,
                          IConsume<MessageB>,
                          IConsumeAsync<MessageC>
{
    public void Consume(MessageA message) {...}

    public void Consume(MessageB message) {...}

    public Task Consume(MessageC message) {...}
}

首先創建一個新的AutoSubscriber實例,傳你的IBus實例和subscriberId前綴給AutoSubscriber的構造函數。這個subscriberId前綴會作為所有自動生成的subscriberIds的前綴。但是不能自定義subscriberId(參看下文)。

注冊在相同程序集中的所有其他消費者,我們僅僅需要去傳包含你的消費者所在的程序集給這個方法:AutoSubscriber.Subscribe(assembly)。注意!,這些事情你應該只做一次,最好在應用啟動的時候。

var subscriber = new AutoSubscriber(bus,
        "my_applications_subscriptionId_prefix");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

通過主題訂閱

默認的AutoSubscriber將不帶主題的綁定。下面的例子中MessageA被注冊到兩個主題。

注意! 如果你運行下面代碼,沒有加上ForTopic屬性標簽,它將有一個"#"路由Key,這個Key將會選擇任何訂閱這個消息類型的訂閱者。假定在默認端口上且admin插件已被安裝,簡單訪問http://localhost:15672/#/queues,如果需要請解除綁定的路由。

public class MyConsumer : IConsume<MessageA>, 
                          IConsume<MessageB>,
                          IConsumeAsync<MessageC>
{
    [ForTopic("Topic.Foo")]
    [ForTopic("Topic.Bar")]
    public void Consume(MessageA message) {...}

    public void Consume(MessageB message) {...}

    public Task Consume(MessageC message) {...}
}

//通過主題發布
var bus = RabbitHutch.CreateBus("host=localhost");

//被選中
var msg1 = new MessageA("topic.foo");
bus.Publish(msg1, "Topic.Foo");

//被選中
var msg2 = new MessageA("topic.bar");
bus.Publish(msg1, "Topic.Bar");

//不會被選中
var msg3 = new MessageA("no pick up);                
bus.Publish(msg3);

指定一個特定的SubscriptionId

AutoSubscriber 默認情況下為每一組Message/Consumer生成一個唯一的SubscriptionId。這意味着當相同的消費者啟動多個實例后,這些實例將會從相同隊列中循環消費消息。

如果你希望subscriptionId是固定的,你能通過帶有AutoSubscriberConsumerAttribute屬性標簽的Consume方法實現。為什么你會讓SubscriptionId是固定的,可以閱讀這里

假如,消費MessageB消息的Consume方法需要固定的SubscriptionId。僅僅需要使用AutoSubscriberConsumerAttribute屬性標簽,為其SubscriptionId設置一個值。

[AutoSubscriberConsumer(SubscriptionId = "MyExplicitId")]
public void Consume(MessageB message) { }

控制SubscriptionId的生成

你當然也可以控制實際的SubscriptionId的生成,只要用AutoSubscriber.GenerateSubscriptionId : Func<ConsumerInfo, string>替換就可以實現。

var subscriber = new AutoSubscriber(bus)
{
    CreateConsumer = t => objectResolver.Resolve(t),
    GenerateSubscriptionId = c => AppDomain.CurrentDomain.FriendlyName
        + c.ConcreteType.Name
};
subscriber.Subscribe(Assembly.GetExecutingAssembly());

注意! 這只是實現的一個樣例。請確保你已經讀過並理解了SubscriptionId值的重要價值。

控制消費者配置設置

當使用autosubsriber去訂閱隊列消息,你能夠設置ISubscriptionConfiguration值,例如AutoDelete,Priority 等。

var subscriber = new AutoSubscriber(bus)
{    
    ConfigureSubscriptionConfiguration = c => c.WithAutoDelete()
                                               .WithPriority(10)
};
subscriber.Subscribe(Assembly.GetExecutingAssembly());

或者,你可以用在consume方法上用屬性標簽的方式。

public class MyConsumer : IConsume<MessageA>
{
    [SubscriptionConfiguration(CancelOnHaFailover = true, PrefetchCount = 10)]
    public void Consume(MessageA message) {...}
}

讓AutoSubscriber使用容器

AutoSubscriber 有一個屬性:MessageDispatcher,這個屬性允許插入你自己消息調度代碼。這樣允許你從IoC容器中或者其他第三方任務調度中,解析你自己的消費者。
讓我們寫一個自定義的IAutoSubscriberMessageDispatcher實現,從 Windsor IoC 容器中去解析消費者。

public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
{
    private readonly IWindsorContainer container;

    public WindsorMessageDispatcher(IWindsorContainer container)
    {
        this.container = container;
    }

    public void Dispatch<TMessage, TConsumer>(TMessage message) where TMessage : class where TConsumer : IConsume<TMessage>
    {
        var consumer = container.Resolve<TConsumer>();
        try
        {
            consumer.Consume(message);
        }
        finally
        {
            container.Release(consumer);
        }
    }

    public Task DispatchAsync<TMessage, TConsumer>(TMessage message) where TMessage: class where TConsumer: IConsumeAsync<TMessage>
    {
        var consumer = _container.Resolve<TConsumer>();           
        return consumer.Consume(message).ContinueWith(t=>_container.Release(consumer));            
    }
}

現在,我們需要注冊消費者到我們的IoC容器中。

var container = new WindsorContainer();
container.Register(
    Component.For<MyConsumer>().ImplementedBy<MyConsumer>()
    );

下一步,讓AutoSubscriber 用我們自定義的IMessageDispatcher:

var bus = RabbitHutch.CreateBus("host=localhost");

var autoSubscriber = new AutoSubscriber(bus, "My_subscription_id_prefix")
{
    MessageDispatcher = new WindsorMessageDispatcher(container)
};
autoSubscriber.Subscribe(GetType().Assembly);
autoSubscriber.SubscribeAsync(GetType().Assembly);

現在每一次消息到來是,一個新的消費者實例,將會從我們自定義的容器中拿到。

英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber
本文地址:http://www.cnblogs.com/HuangLiang/p/EasyNetQ_AutoSubscriber.html
我的微信訂閱號:open_dotNET


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM