RabbitMQ c#版實現(轉)


出處:https://www.cnblogs.com/hanfan/p/9842301.html

網上很多人已經總結的很好了,比如今天看到的這個。https://www.cnblogs.com/LipeiNet/p/9877189.html

我就不總結了,貼點代碼。

RabbitMQConnect.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
using  System;
using  System.IO;
using  System.Net.Sockets;
using  Polly;
using  Polly.Retry;
using  RabbitMQ.Client;
using  RabbitMQ.Client.Events;
using  RabbitMQ.Client.Exceptions;
 
namespace  Common.Tool.RabbitMQ
{
     public  class  RabbitMQConnect
     {
         
         static  string  host =  "127.0.0.1" ;
         static  string  UserName =  "H" ;
         static  string  password =  "H" ;
 
         public  readonly  static  IConnectionFactory _connectionFactory;
         IConnection _connection;
         object  sync_root =  new  object ();
         bool  _disposed;
         static  RabbitMQConnect()
         {
             //if (host == "localhost")
             //{
             //    _connectionFactory = new ConnectionFactory() { HostName = host };
             //}
             //else
             {
                 _connectionFactory =  new  ConnectionFactory() { HostName = host, UserName = UserName, Password = password };
             }
         }
         public  bool  IsConnected =>  this ._connection !=  null  &&  this ._connection.IsOpen &&  this ._disposed;
         public  IModel CreateModel()
         {
             if  (! this .IsConnected)
             {
                 this .TryConnect();
             }
             return  this ._connection.CreateModel();
         }
         public  bool  TryConnect()
         {
             lock  ( this .sync_root)
             {
                 RetryPolicy policy = RetryPolicy.Handle<SocketException>() //如果我們想指定處理多個異常類型通過OR即可
                     .Or<BrokerUnreachableException>() //ConnectionFactory.CreateConnection期間無法打開連接時拋出異常
                     .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                         {
 
                         }); // 重試次數,提供等待特定重試嘗試的持續時間的函數,每次重試時調用的操作。
                 policy.Execute(() =>
                 {
                     this ._connection = _connectionFactory.CreateConnection();
 
                 });
 
                 if  ( this .IsConnected)
                 {
                     //當連接被破壞時引發。如果在添加事件處理程序時連接已經被銷毀對於此事件,事件處理程序將立即被觸發。
                     this ._connection.ConnectionShutdown +=  this .OnConnectionShutdown;
                     //在連接調用的回調中發生異常時發出信號。當ConnectionShutdown處理程序拋出異常時,此事件將發出信號。如果將來有更多的事件出現在RabbitMQ.Client.IConnection上,那么這個事件當這些事件處理程序中的一個拋出異常時,它們將被標記。
                     this ._connection.CallbackException +=  this .OnCallbackException;
                     this ._connection.ConnectionBlocked +=  this .OnConnectionBlocked;
 
                     //LogHelperNLog.Info($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
 
                     return  true ;
                 }
                 else
                 {
                     // LogHelperNLog.Info("FATAL ERROR: RabbitMQ connections could not be created and opened");
 
                     return  false ;
                 }
             }
         }
 
         void  OnConnectionShutdown( object  sender, ShutdownEventArgs reason)
         {
             if  ( this ._disposed)  return ;
             //RabbitMQ連接正在關閉。 嘗試重新連接...
             //LogHelperNLog.Info("A RabbitMQ connection is on shutdown. Trying to re-connect...");
 
             this .TryConnect();
         }
         /// <summary>
         ///   
         /// </summary>
         /// <param name="sender"></param>
         /// <param name="e"></param>
         void  OnCallbackException( object  sender, CallbackExceptionEventArgs e)
         {
             if  ( this ._disposed)  return ;
 
             // LogHelperNLog.Info("A RabbitMQ connection throw exception. Trying to re-connect...");
 
             this .TryConnect();
         }
         private  void  OnConnectionBlocked( object  sender, ConnectionBlockedEventArgs e)
         {
             if  ( this ._disposed)  return ;
 
             //  LogHelperNLog.Info("A RabbitMQ connection is shutdown. Trying to re-connect...");
 
             this .TryConnect();
         }
 
         public  void  Dispose()
         {
             if  ( this ._disposed)  return ;
 
             this ._disposed =  true ;
 
             try
             {
                 this ._connection.Dispose();
             }
             catch  (IOException ex)
             {
                 //_logger.LogCritical(ex.ToString());
                 //  LogHelperNLog.Error(ex);
             }
         }
     }
}

  

RabbitMQSend.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
using  Newtonsoft.Json;
using  Newtonsoft.Json.Converters;
using  System.Text;
 
namespace  Common.Tool.RabbitMQ
{
     public   class  RabbitMQSend
     {
         /// <summary>
         /// Newtonsoft.Json利用IsoDateTimeConverter處理日期類型
         /// </summary>
         static  IsoDateTimeConverter dtConverter =  new  IsoDateTimeConverter { DateTimeFormat =  "yyyy-MM-dd HH:mm:ss"  };
         static  RabbitMQConnect connection= null ;
 
         static  RabbitMQSend()
         {
             connection =  new  RabbitMQConnect();
         }
 
         /// <summary>
         /// 添加信息到隊列
         /// </summary>
         /// <typeparam name="T"></typeparam>
         /// <param name="item">信息</param>
         /// <param name="queueName">隊列名</param>
         public  static  void  PushMsgToMq<T>(T item,  string  queueName)
         {
             string  msg = JsonConvert.SerializeObject(item, dtConverter);
             using  (global::RabbitMQ.Client.IModel channel = connection.CreateModel())
             {
                 channel.QueueDeclare(queue: queueName,
                     durable:  true ,
                     exclusive:  false ,
                     autoDelete:  false ,
                     arguments:  null );
                 //Construct a completely empty content header for use with the Basic content class.
                 //構造一個完全空的內容標頭,以便與Basic內容類一起使用。
                 global::RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties();
                 properties.Persistent =  true ;
                 byte [] body = Encoding.UTF8.GetBytes(msg);
                 channel.BasicPublish(exchange:  "" ,
                     routingKey: queueName,
                     basicProperties: properties,
                     body: body);
             }
         }
     }
}

  

RabbitMQReceive.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using  Newtonsoft.Json;
using  RabbitMQ.Client;
using  RabbitMQ.Client.Events;
using  System;
using  System.Text;
 
namespace  Common.Tool.RabbitMQ
{
     public  class  RabbitMQReceive : IDisposable
     {
         IConnection connection =  null ;
         IModel channel =  null ;
 
         public  void  BindReceiveMqMsg<T>(Func<T,  bool > func, Action< string > log,  string  queueName)
         {
             this .connection = RabbitMQConnect._connectionFactory.CreateConnection(); //創建與指定端點的連接。
             this .channel =  this .connection.CreateModel();  //創建並返回新的頻道,會話和模型。
             this .channel.QueueDeclare(queue: queueName, //隊列名稱
                              durable:  true , //是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
                              exclusive:  false , //是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等於true的話用於一個隊列只能有一個消費者來消費的場景
                              autoDelete:  false , //是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除
                              arguments:  null ); //隊列中的消息什么時候會自動被刪除?
 
             this .channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global:  false ); //(Spec方法)配置Basic內容類的QoS參數。
                                                                                     //第一個參數是可接收消息的大小的  0不受限制
                                                                                     //第二個參數是處理消息最大的數量  1 那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞
                                                                                     //第三個參數則設置了是不是針對整個Connection的,因為一個Connection可以有多個Channel,如果是false則說明只是針對於這個Channel的。
             EventingBasicConsumer consumer =  new  EventingBasicConsumer( this .channel); //構造函數,它將Model屬性設置為給定值。
             consumer.Received += (model, bdea) =>
             {
                 byte [] body = bdea.Body;
                 string  message = Encoding.UTF8.GetString(body);
                 log?.Invoke(message);
 
                 T item = JsonConvert.DeserializeObject<T>(message);
                 bool  result = func(item);
                 if  (result)
                 {
                     //(Spec方法)確認一個或多個已傳送的消息。
                     this .channel.BasicAck(deliveryTag: bdea.DeliveryTag, multiple:  false );
                 }
             };
             this .channel.BasicConsume(queue: queueName, noAck:  false , consumer: consumer);  //The consumer is started with noAck = false(i.e.BasicAck is required), an empty consumer tag (i.e. the server creates and returns a fresh consumer tag), noLocal=false and exclusive=false.
         }
         public  void  Dispose()
         {
             if  ( this .channel !=  null )
             {
                 this .channel.Close();
             }
 
             if  ( this .connection !=  null )
             {
                 this .connection.Close();
             }
         }
     }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM