出處:https://www.cnblogs.com/hanfan/p/9842301.html
網上很多人已經總結的很好了,比如今天看到的這個。https://www.cnblogs.com/LipeiNet/p/9877189.html
我就不總結了,貼點代碼。
RabbitMQConnect.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
using
System;
using
System.IO;
using
System.Net.Sockets;
using
Polly;
using
Polly.Retry;
using
RabbitMQ.Client;
using
RabbitMQ.Client.Events;
using
RabbitMQ.Client.Exceptions;
namespace
Common.Tool.RabbitMQ
{
public
class
RabbitMQConnect
{
static
string
host =
"127.0.0.1"
;
static
string
UserName =
"H"
;
static
string
password =
"H"
;
public
readonly
static
IConnectionFactory _connectionFactory;
IConnection _connection;
object
sync_root =
new
object
();
bool
_disposed;
static
RabbitMQConnect()
{
//if (host == "localhost")
//{
// _connectionFactory = new ConnectionFactory() { HostName = host };
//}
//else
{
_connectionFactory =
new
ConnectionFactory() { HostName = host, UserName = UserName, Password = password };
}
}
public
bool
IsConnected =>
this
._connection !=
null
&&
this
._connection.IsOpen &&
this
._disposed;
public
IModel CreateModel()
{
if
(!
this
.IsConnected)
{
this
.TryConnect();
}
return
this
._connection.CreateModel();
}
public
bool
TryConnect()
{
lock
(
this
.sync_root)
{
RetryPolicy policy = RetryPolicy.Handle<SocketException>()
//如果我們想指定處理多個異常類型通過OR即可
.Or<BrokerUnreachableException>()
//ConnectionFactory.CreateConnection期間無法打開連接時拋出異常
.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
});
// 重試次數,提供等待特定重試嘗試的持續時間的函數,每次重試時調用的操作。
policy.Execute(() =>
{
this
._connection = _connectionFactory.CreateConnection();
});
if
(
this
.IsConnected)
{
//當連接被破壞時引發。如果在添加事件處理程序時連接已經被銷毀對於此事件,事件處理程序將立即被觸發。
this
._connection.ConnectionShutdown +=
this
.OnConnectionShutdown;
//在連接調用的回調中發生異常時發出信號。當ConnectionShutdown處理程序拋出異常時,此事件將發出信號。如果將來有更多的事件出現在RabbitMQ.Client.IConnection上,那么這個事件當這些事件處理程序中的一個拋出異常時,它們將被標記。
this
._connection.CallbackException +=
this
.OnCallbackException;
this
._connection.ConnectionBlocked +=
this
.OnConnectionBlocked;
//LogHelperNLog.Info($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
return
true
;
}
else
{
// LogHelperNLog.Info("FATAL ERROR: RabbitMQ connections could not be created and opened");
return
false
;
}
}
}
void
OnConnectionShutdown(
object
sender, ShutdownEventArgs reason)
{
if
(
this
._disposed)
return
;
//RabbitMQ連接正在關閉。 嘗試重新連接...
//LogHelperNLog.Info("A RabbitMQ connection is on shutdown. Trying to re-connect...");
this
.TryConnect();
}
/// <summary>
///
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void
OnCallbackException(
object
sender, CallbackExceptionEventArgs e)
{
if
(
this
._disposed)
return
;
// LogHelperNLog.Info("A RabbitMQ connection throw exception. Trying to re-connect...");
this
.TryConnect();
}
private
void
OnConnectionBlocked(
object
sender, ConnectionBlockedEventArgs e)
{
if
(
this
._disposed)
return
;
// LogHelperNLog.Info("A RabbitMQ connection is shutdown. Trying to re-connect...");
this
.TryConnect();
}
public
void
Dispose()
{
if
(
this
._disposed)
return
;
this
._disposed =
true
;
try
{
this
._connection.Dispose();
}
catch
(IOException ex)
{
//_logger.LogCritical(ex.ToString());
// LogHelperNLog.Error(ex);
}
}
}
}
|
RabbitMQSend.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
using
Newtonsoft.Json;
using
Newtonsoft.Json.Converters;
using
System.Text;
namespace
Common.Tool.RabbitMQ
{
public
class
RabbitMQSend
{
/// <summary>
/// Newtonsoft.Json利用IsoDateTimeConverter處理日期類型
/// </summary>
static
IsoDateTimeConverter dtConverter =
new
IsoDateTimeConverter { DateTimeFormat =
"yyyy-MM-dd HH:mm:ss"
};
static
RabbitMQConnect connection=
null
;
static
RabbitMQSend()
{
connection =
new
RabbitMQConnect();
}
/// <summary>
/// 添加信息到隊列
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="item">信息</param>
/// <param name="queueName">隊列名</param>
public
static
void
PushMsgToMq<T>(T item,
string
queueName)
{
string
msg = JsonConvert.SerializeObject(item, dtConverter);
using
(global::RabbitMQ.Client.IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable:
true
,
exclusive:
false
,
autoDelete:
false
,
arguments:
null
);
//Construct a completely empty content header for use with the Basic content class.
//構造一個完全空的內容標頭,以便與Basic內容類一起使用。
global::RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent =
true
;
byte
[] body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange:
""
,
routingKey: queueName,
basicProperties: properties,
body: body);
}
}
}
}
|
RabbitMQReceive.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
using
Newtonsoft.Json;
using
RabbitMQ.Client;
using
RabbitMQ.Client.Events;
using
System;
using
System.Text;
namespace
Common.Tool.RabbitMQ
{
public
class
RabbitMQReceive : IDisposable
{
IConnection connection =
null
;
IModel channel =
null
;
public
void
BindReceiveMqMsg<T>(Func<T,
bool
> func, Action<
string
> log,
string
queueName)
{
this
.connection = RabbitMQConnect._connectionFactory.CreateConnection();
//創建與指定端點的連接。
this
.channel =
this
.connection.CreateModel();
//創建並返回新的頻道,會話和模型。
this
.channel.QueueDeclare(queue: queueName,
//隊列名稱
durable:
true
,
//是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
exclusive:
false
,
//是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等於true的話用於一個隊列只能有一個消費者來消費的場景
autoDelete:
false
,
//是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除
arguments:
null
);
//隊列中的消息什么時候會自動被刪除?
this
.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global:
false
);
//(Spec方法)配置Basic內容類的QoS參數。
//第一個參數是可接收消息的大小的 0不受限制
//第二個參數是處理消息最大的數量 1 那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞
//第三個參數則設置了是不是針對整個Connection的,因為一個Connection可以有多個Channel,如果是false則說明只是針對於這個Channel的。
EventingBasicConsumer consumer =
new
EventingBasicConsumer(
this
.channel);
//構造函數,它將Model屬性設置為給定值。
consumer.Received += (model, bdea) =>
{
byte
[] body = bdea.Body;
string
message = Encoding.UTF8.GetString(body);
log?.Invoke(message);
T item = JsonConvert.DeserializeObject<T>(message);
bool
result = func(item);
if
(result)
{
//(Spec方法)確認一個或多個已傳送的消息。
this
.channel.BasicAck(deliveryTag: bdea.DeliveryTag, multiple:
false
);
}
};
this
.channel.BasicConsume(queue: queueName, noAck:
false
, consumer: consumer);
//The consumer is started with noAck = false(i.e.BasicAck is required), an empty consumer tag (i.e. the server creates and returns a fresh consumer tag), noLocal=false and exclusive=false.
}
public
void
Dispose()
{
if
(
this
.channel !=
null
)
{
this
.channel.Close();
}
if
(
this
.connection !=
null
)
{
this
.connection.Close();
}
}
}
}
|