基於EasyNetQ的RabbitMQ封裝類


最近在搗鼓RabbitMQ,為了方便使用,自己基於EasyNetQ封裝了一個類,現在貼出來還望各路大佬神明指點,共同學習。

  1     /// <summary>
  2     /// RabbitMQ客戶端封裝類,基於EasyNetQ,使用時需要從nuget安裝EasyNetQ。
  3     /// <para>
  4     /// <example>
  5     /// 使用方法:
  6     /// <code>
  7     /// using(var mq = new RabbitMqClient('rabbitmq連接字符串'))
  8     /// { ...
  9     /// }
 10     /// </code>
 11     /// </example>
 12     /// </para>
 13     /// </summary>
 14     public class RabbitMqClient : IDisposable
 15     {
 16         private readonly IBus bus;
 17 
 18         /// <summary>
 19         /// 構造函數
 20         /// </summary>
 21         /// <param name="connectionString">rabbitmq連接字符串</param>
 22         public RabbitMqClient(string connectionString)
 23         {
 24             if (string.IsNullOrEmpty(connectionString))
 25                 throw new ArgumentNullException(nameof(connectionString));
 26             bus = RabbitHutch.CreateBus(connectionString);
 27         }
 28         /// <summary>
 29         /// 發布一條消息(廣播)
 30         /// </summary>
 31         /// <param name="message"></param>
 32         public void Publish<TMessage>(TMessage message) where TMessage:class 
 33         {
 34             bus.PublishAsync(message);
 35         }
 36 
 37         /// <summary>
 38         /// 指定Topic,發布一條消息
 39         /// </summary>
 40         /// <param name="message"></param>
 41         /// <param name="topic"></param>
 42         public void PublishWithTopic<TMessage>(TMessage message, string topic) where TMessage : class
 43         {
 44             if(string.IsNullOrEmpty(topic))
 45                 Publish(message);
 46             else
 47                 bus.PublishAsync(message, x=>x.WithTopic(topic));
 48         }
 49 
 50         /// <summary>
 51         /// 發布消息。一次性發布多條
 52         /// </summary>
 53         /// <param name="messages"></param>
 54         public void PublishMany<TMessage>(List<TMessage> messages) where TMessage : class 
 55         {
 56             foreach (var message in messages)
 57             {
 58                 Publish(message);
 59                 Thread.Sleep(50);//必須加上,以防消息阻塞
 60             }
 61         }
 62 
 63         /// <summary>
 64         /// 發布消息。一次性發布多條
 65         /// </summary>
 66         /// <param name="messages"></param>
 67         /// <param name="topic"></param>
 68         public void PublishManyWithTopic<TMessage>(List<TMessage> messages, string topic) where TMessage : class
 69         {
 70             foreach (var message in messages)
 71             {
 72                 PublishWithTopic(message, topic);
 73                 Thread.Sleep(50);//必須加上,以防消息阻塞
 74             }
 75         }
 76 
 77         /// <summary>
 78         /// 給指定隊列發送一條信息
 79         /// </summary>
 80         /// <param name="queue">隊列名稱</param>
 81         /// <param name="message">消息</param>
 82         public void Send<TMessage>(string queue, TMessage message) where TMessage : class
 83         {
 84             bus.Send(queue, message);
 85         }
 86 
 87         /// <summary>
 88         /// 給指定隊列批量發送信息
 89         /// </summary>
 90         /// <param name="queue">隊列名稱</param>
 91         /// <param name="messages">消息</param>
 92         public void SendMany<TMessage>(string queue, IList<TMessage> messages) where TMessage : class
 93         {
 94             foreach (var message in messages)
 95             {
 96                 SendAsync(queue, message);
 97                 Thread.Sleep(50);//必須加上,以防消息阻塞
 98             }
 99         }
100 
101         /// <summary>
102         /// 給指定隊列發送一條信息(異步)
103         /// </summary>
104         /// <param name="queue">隊列名稱</param>
105         /// <param name="message">消息</param>
106         /// <returns></returns>
107         public async void SendAsync<TMessage>(string queue, TMessage message) where TMessage:class 
108         {
109             await bus.SendAsync(queue, message);
110         }
111 
112         /// <summary>
113         /// 從指定隊列接收一天信息,並做相關處理。
114         /// </summary>
115         /// <param name="queue">隊列名稱</param>
116         /// <param name="process">
117         /// 消息處理委托方法
118         /// <para>
119         /// <example>
120         /// 例如:
121         /// <code>
122         /// message=>Task.Factory.StartNew(()=>{
123         ///     Console.WriteLine(message);
124         /// })
125         /// </code>
126         /// </example>
127         /// </para>
128         /// </param>
129         public void Receive<TMessage>(string queue, Func<TMessage, Task> process) where TMessage:class 
130         {
131             bus.Receive(queue, process);
132         }
133 
134         /// <summary>
135         /// 消息訂閱
136         /// </summary>
137         /// <param name="subscriptionId">消息訂閱標識</param>
138         /// <param name="process">
139         /// 消息處理委托方法
140         /// <para>
141         /// <example>
142         /// 例如:
143         /// <code>
144         /// message=>Task.Factory.StartNew(()=>{
145         ///     Console.WriteLine(message);
146         /// })
147         /// </code>
148         /// </example>
149         /// </para>
150         /// </param>
151         public void Subscribe<TMessage>(string subscriptionId, Func<TMessage, Task> process) where TMessage:class 
152         {
153             bus.Subscribe<TMessage>(subscriptionId, message => process(message));
154         }
155 
156         /// <summary>
157         /// 消息訂閱
158         /// </summary>
159         /// <param name="subscriptionId">消息訂閱標識</param>
160         /// <param name="process">
161         /// 消息處理委托方法
162         /// <para>
163         /// <example>
164         /// 例如:
165         /// <code>
166         /// message=>Task.Factory.StartNew(()=>{
167         ///     Console.WriteLine(message);
168         /// })
169         /// </code>
170         /// </example>
171         /// </para>
172         /// </param>
173         /// <param name="topic">topic</param>
174         public void SubscribeWithTopic<TMessage>(string subscriptionId, Func<TMessage, Task> process, string topic) where TMessage:class 
175         {
176             bus.Subscribe<TMessage>(subscriptionId, message => process(message), x=>x.WithTopic(topic));
177         }
178 
179         /// <summary>
180         /// 自動訂閱
181         /// </summary>
182         /// <param name="assemblyName"></param>
183         /// <param name="subscriptionIdPrefix"></param>
184         /// <param name="topic"></param>
185         public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic)
186         {
187             var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
188             if (!string.IsNullOrEmpty(topic))
189                 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic);
190             subscriber.Subscribe(Assembly.Load(assemblyName));
191         }
192 
193         /// <summary>
194         /// 資源釋放
195         /// </summary>
196         public void Dispose()
197         {
198             if (bus != null) bus.Dispose();
199         }
200     }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM