1:RabbitMQ是個啥?(專業術語參考自網絡)
RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
RabbitMQ服務器是用Erlang語言編寫的,Erlang是專門為高並發而生的語言,而集群和故障轉移是構建在開發電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫
2:使用RabbitMQ有啥好處?
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是非常好的,數據能夠保證百分之百的不丟失。可以使用鏡像隊列,它的穩定性非常好。所以說在我們互聯網的金融行業。
對數據的穩定性和可靠性要求都非常高的情況下,我們都會選擇RabbitMQ。當然沒有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的優化。
RabbitMQ可以構建異地雙活架構,包括每一個節點存儲方式可以采用磁盤或者內存的方式,
3:RabbitMq的安裝以及環境搭建等:
網絡上有很多關於怎么搭建配置RabbitMq服務環境的詳細文章,也比較簡單,這里不再說明,本人是Docker上面的pull RabbitMq 鏡像來安裝的!
3.1:運行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用場景主要有哪些,啥時候用或者不用?
4.1什么時候使用MQ?
1)數據驅動的任務依賴
2)上游不關心多下游執行結果
3)異步返回執行時間長
4.2什么時候不使用MQ?
需要實時關注執行結果 (eg:同步調用)
5:具體C#怎么使用RabbitMq?下面直接上code和測試截圖了(Demo環境是.NetCore3.1控制台+Docker上的RabbitMQ容器來進行的)
6:sample模式,就是簡單地隊列模式,一進一出的效果差不多,測試截圖:
Code:
1 //簡單生產端 ui調用者 2 3 using System; 4 namespace RabbitMqPublishDemo 5 { 6 using MyRabbitMqService; 7 using System.Runtime.CompilerServices; 8 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 //就是簡單的隊列,生產者 14 Console.WriteLine("====RabbitMqPublishDemo===="); 15 for (int i = 0; i < 500; i++) 16 { 17 ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}"); 18 } 19 Console.WriteLine("生成完畢!"); 20 Console.ReadLine(); 21 } 22 } 23 } 24 25 /// <summary> 26 /// 簡單生產者 邏輯 27 /// </summary> 28 /// <param name="queueName"></param> 29 /// <param name="msg"></param> 30 public static void PublishSampleMsg(string queueName, string msg) 31 { 32 33 using (IConnection conn = connectionFactory.CreateConnection()) 34 { 35 using (IModel channel = conn.CreateModel()) 36 { 37 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 38 var msgBody = Encoding.UTF8.GetBytes(msg); 39 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); 40 } 41 } 42 } 43 44 45 //簡單消費端 46 using System; 47 48 namespace RabbitMqConsumerDemo 49 { 50 using MyRabbitMqService; 51 using System.Runtime.InteropServices; 52 53 class Program 54 { 55 static void Main(string[] args) 56 { 57 Console.WriteLine("====RabbitMqConsumerDemo===="); 58 ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr => 59 { 60 Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}"); 61 }); 62 Console.ReadLine(); 63 } 64 } 65 } 66 67 #region 簡單生產者后端邏輯 68 /// <summary> 69 /// 簡單消費者 70 /// </summary> 71 /// <param name="queueName">隊列名稱</param> 72 /// <param name="isBasicNack">失敗后是否自動放到隊列</param> 73 /// <param name="handleMsgStr">有就自己對字符串的處理,如果要存儲到數據庫請自行擴展</param> 74 public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, 75 { 76 Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); 77 IConnection conn = connectionFactory.CreateConnection(); 78 IModel channel = conn.CreateModel(); 79 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 80 var consumer = new EventingBasicConsumer(channel); 81 consumer.Received += (sender, ea) => 82 { 83 byte[] bymsg = ea.Body.ToArray(); 84 string msg = Encoding.UTF8.GetString(bymsg); 85 if (handleMsgStr != null) 86 { 87 handleMsgStr.Invoke(msg); 88 } 89 else 90 { 91 Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); 92 } 93 }; 94 channel.BasicConsume(queueName, autoAck: true, consumer); 95 } 96 #endregion 97 98
7:Work模式
1 //就如下的code, 多次生產,3個消費者都可以自動開始消費 2 3 //生產者 4 using System; 5 namespace RabbitMqPublishDemo 6 { 7 using MyRabbitMqService; 8 using System.Runtime.CompilerServices; 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 for (int i = 0; i < 500; i++) 14 { 15 ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :發布消息成功{i}"); 16 } 17 Console.WriteLine("工作隊列模式 生成完畢......!"); 18 Console.ReadLine(); 19 } 20 } 21 } 22 23 //生產者后端邏輯 24 public static void PublishWorkQueueModel(string queueName, string msg) 25 { 26 using (var connection = connectionFactory.CreateConnection()) 27 using (var channel = connection.CreateModel()) 28 { 29 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 30 var body = Encoding.UTF8.GetBytes(msg); 31 var properties = channel.CreateBasicProperties(); 32 properties.Persistent = true; 33 34 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body); 35 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}"); 36 } 37 } 38 39 //work消費端 40 using System; 41 42 namespace RabbitMqConsumerDemo 43 { 44 using MyRabbitMqService; 45 using System.Runtime.InteropServices; 46 class Program 47 { 48 static void Main(string[] args) 49 { 50 Console.WriteLine("====Work模式開啟了===="); 51 ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg => 52 { 53 Console.WriteLine($"work模式獲取到消息{msg}"); 54 }); 55 Console.ReadLine(); 56 } 57 } 58 } 59 60 //work后端邏輯 61 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null) 62 { 63 var connection = connectionFactory.CreateConnection(); 64 var channel = connection.CreateModel(); 65 66 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 67 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 68 69 var consumer = new EventingBasicConsumer(channel); 70 Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages...."); 71 72 consumer.Received += (sender, ea) => 73 { 74 var body = ea.Body.ToArray(); 75 var message = Encoding.UTF8.GetString(body); 76 if (handserMsg != null) 77 { 78 if (!string.IsNullOrEmpty(message)) 79 { 80 handserMsg.Invoke(message); 81 } 82 } 83 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 84 }; 85 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 86 }
8:Fanout
Code:
1 //同一個消息會被多個訂閱者消費 2 3 //發布者 4 using System; 5 6 namespace RabbitMqPublishDemo 7 { 8 using MyRabbitMqService; 9 using System.Runtime.CompilerServices; 10 11 class Program 12 { 13 static void Main(string[] args) 14 { 15 16 #region 發布訂閱模式,帶上了exchange 17 for (int i = 0; i < 500; i++) 18 { 19 ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"發布的消息是:{i}"); 20 } 21 Console.WriteLine("發布ok!"); 22 #endregion 23 Console.ReadLine(); 24 } 25 } 26 } 27 //發布者的后端邏輯 我在這里選擇了扇形: ExchangeType.Fanout 28 public static void PublishExchangeModel(string exchangeName, string message) 29 { 30 using (var connection = connectionFactory.CreateConnection()) 31 using (var channel = connection.CreateModel()) 32 { 33 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); 34 var body = Encoding.UTF8.GetBytes(message); 35 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body); 36 Console.WriteLine($" Sent {message}"); 37 } 38 } 39 40 41 //訂閱者 42 using System; 43 namespace RabbitMqConsumerDemo 44 { 45 using MyRabbitMqService; 46 using System.Runtime.InteropServices; 47 class Program 48 { 49 static void Main(string[] args) 50 { 51 52 #region 發布訂閱模式 Exchange 53 ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg => 54 { 55 Console.WriteLine($"訂閱到消息:{msg}"); 56 }); 57 #endregion 58 Console.ReadLine(); 59 } 60 } 61 } 62 63 //訂閱者后端的邏輯 64 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null) 65 { 66 var connection = connectionFactory.CreateConnection(); 67 var channel = connection.CreateModel(); 68 69 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉 70 71 var queueName = channel.QueueDeclare().QueueName; 72 channel.QueueBind(queue: queueName, 73 exchange: exchangeName, 74 routingKey: ""); 75 76 Console.WriteLine(" Waiting for msg...."); 77 78 var consumer = new EventingBasicConsumer(channel); 79 consumer.Received += (model, ea) => 80 { 81 var body = ea.Body.ToArray(); 82 var message = Encoding.UTF8.GetString(body); 83 if (handlerMsg != null) 84 { 85 if (!string.IsNullOrEmpty(message)) 86 { 87 handlerMsg.Invoke(message); 88 } 89 } 90 else 91 { 92 Console.WriteLine($"訂閱到消息:{message}"); 93 } 94 }; 95 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); 96 }
9:Direct
Code:
1 //發布者 2 using System; 3 4 namespace RabbitMqPublishDemo 5 { 6 using MyRabbitMqService; 7 using System.Runtime.CompilerServices; 8 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 #region 發布訂閱 交換機路由模式 Direct 14 string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq' 15 Console.WriteLine("開始發布中。。。"); 16 for (int i = 0; i < 20; i++) 17 { 18 string msg = $"小明有{i}只寶劍"; 19 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue); 20 21 //下面的為固定的寫法 22 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg); 23 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好大家好{i}", routerKey:"onlylog"); 24 } 25 Console.WriteLine("這次發布完畢。。。"); 26 #endregion 27 Console.ReadLine(); 28 } 29 } 30 } 31 32 //發布者后端邏輯 發布訂閱的路由模式 Direct 33 /// <summary> 34 /// 發布 Direct 路由模式 Direct 35 /// </summary> 36 /// <param name="message"></param> 37 /// <param name="exchangeName"></param> 38 /// <param name="routerKey"></param> 39 public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent") 40 { 41 using (IConnection connection = connectionFactory.CreateConnection()) 42 { 43 using (IModel channelmodel = connection.CreateModel()) 44 { 45 channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 46 byte[] bymsg = Encoding.UTF8.GetBytes(message); 47 channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg); 48 49 // byte[] bytemsg = Encoding.UTF8.GetBytes(message); 50 // channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg); 51 } 52 } 53 } 54 55 //訂閱者 Exchange Router路由 Director 56 using System; 57 58 namespace RabbitMqConsumerDemo 59 { 60 using MyRabbitMqService; 61 using System.Runtime.InteropServices; 62 63 class Program 64 { 65 static void Main(string[] args) 66 { 67 Console.WriteLine("開始消費中。。!"); 68 if (args.Length > 0) 69 { 70 string routerKeyValue = args[0].Split("=")[1]; 71 Console.WriteLine($"routerKey=>{routerKeyValue}"); 72 if (!string.IsNullOrEmpty(routerKeyValue)) 73 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg => 74 { 75 Console.WriteLine($"拿到消息:{msg}"); 76 }); 77 else 78 Console.WriteLine("沒有獲取到routerKey !"); 79 } 80 //else 81 //{ 82 // ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg => 83 // { 84 // Console.WriteLine($"拿到消息:{msg}"); 85 // }); 86 //} 87 Console.ReadLine(); 88 } 89 } 90 } 91 92 //訂閱者 Exchange Router路由 Director 后端邏輯 93 public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null) 94 { 95 var connection = connectionFactory.CreateConnection(); 96 var channel = connection.CreateModel(); 97 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 98 var queueName = channel.QueueDeclare().QueueName; 99 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey); 100 101 Console.WriteLine("wating for message...!"); 102 var consumer = new EventingBasicConsumer(channel); 103 //(object sender, BasicDeliverEventArgs e) 104 consumer.Received += (sender, e) => 105 { 106 var bytedata = e.Body.ToArray(); 107 var getRoutekey = e.RoutingKey; 108 string msg = Encoding.UTF8.GetString(bytedata); 109 if (handler != null) 110 handler.Invoke(msg); 111 else 112 Console.WriteLine($"路由{getRoutekey},訂閱到消息{msg}!"); 113 }; 114 channel.BasicConsume(queue: queueName, autoAck: true, consumer); 115 } 116
最后貼出測試幫助類,
1 using System; 2 3 namespace MyRabbitMqService 4 { 5 using RabbitMQ.Client; 6 using RabbitMQ.Client.Events; 7 using System.Data; 8 using System.Text; 9 using System.Threading; 10 11 public class ZrfRabbitMqHelper 12 { 13 private static readonly IConnectionFactory connectionFactory = new ConnectionFactory() 14 { 15 // VirtualHost = "/", 16 HostName = "110,112,110,112", 17 UserName = "zrffengge", 18 Password = "123qqlove@", 19 Endpoint = new AmqpTcpEndpoint(new Uri("amqp://147.87.107.23:5670"))//這個也是ok的 20 }; 21 22 #region 簡單生產者消費者 23 /// <summary> 24 /// 簡單消費者 25 /// </summary> 26 /// <param name="queueName">隊列名稱</param> 27 /// <param name="isBasicNack">失敗后是否自動放到隊列</param> 28 /// <param name="handleMsgStr">有就自己對字符串的處理,如果要存儲到數據庫請自行擴展</param> 29 public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, 30 { 31 Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); 32 IConnection conn = connectionFactory.CreateConnection(); 33 IModel channel = conn.CreateModel(); 34 //channel.BasicQos(1)////當消費者有x條消息沒有響應ACK時,不再給這個消費者發送消息 35 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 36 var consumer = new EventingBasicConsumer(channel); 37 consumer.Received += (sender, ea) => 38 { 39 byte[] bymsg = ea.Body.ToArray(); 40 string msg = Encoding.UTF8.GetString(bymsg); 41 if (handleMsgStr != null) 42 { 43 handleMsgStr.Invoke(msg); 44 } 45 else 46 { 47 Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); 48 } 49 }; 50 channel.BasicConsume(queueName, autoAck: true, consumer); 51 } 52 53 /// <summary> 54 /// 簡單生產者 55 /// </summary> 56 /// <param name="queueName"></param> 57 /// <param name="msg"></param> 58 public static void PublishSampleMsg(string queueName, string msg) 59 { 60 61 using (IConnection conn = connectionFactory.CreateConnection()) 62 { 63 using (IModel channel = conn.CreateModel()) 64 { 65 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 66 var msgBody = Encoding.UTF8.GetBytes(msg); 67 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); 68 } 69 } 70 } 71 #endregion 72 73 #region WorkQueue 模式 74 /// <summary> 75 /// work模式 76 /// </summary> 77 /// <param name="queueName"></param> 78 /// <param name="sleepHmao"></param> 79 /// <param name="handserMsg"></param> 80 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null) 81 { 82 var connection = connectionFactory.CreateConnection(); 83 var channel = connection.CreateModel(); 84 85 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 86 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 87 88 var consumer = new EventingBasicConsumer(channel); 89 Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages...."); 90 91 consumer.Received += (sender, ea) => 92 { 93 var body = ea.Body.ToArray(); 94 var message = Encoding.UTF8.GetString(body); 95 if (handserMsg != null) 96 { 97 if (!string.IsNullOrEmpty(message)) 98 { 99 handserMsg.Invoke(message); 100 } 101 } 102 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 103 }; 104 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 105 } 106 public static void PublishWorkQueueModel(string queueName, string msg) 107 { 108 using (var connection = connectionFactory.CreateConnection()) 109 using (var channel = connection.CreateModel()) 110 { 111 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 112 113 var body = Encoding.UTF8.GetBytes(msg); 114 var properties = channel.CreateBasicProperties(); 115 properties.Persistent = true; 116 117 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body); 118 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}"); 119 } 120 } 121 #endregion 122 123 #region Publish/Subscribe 發布訂閱模式 開始有交換機了 exchange 124 public static void PublishExchangeModel(string exchangeName, string message) 125 { 126 using (var connection = connectionFactory.CreateConnection()) 127 using (var channel = connection.CreateModel()) 128 { 129 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); 130 var body = Encoding.UTF8.GetBytes(message); 131 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body); 132 Console.WriteLine($" Sent {message}"); 133 } 134 } 135 136 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null) 137 { 138 var connection = connectionFactory.CreateConnection(); 139 var channel = connection.CreateModel(); 140 141 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉 142 143 var queueName = channel.QueueDeclare().QueueName; 144 channel.QueueBind(queue: queueName, 145 exchange: exchangeName, 146 routingKey: ""); 147 148 Console.WriteLine(" Waiting for msg...."); 149 150 var consumer = new EventingBasicConsumer(channel); 151 consumer.Received += (model, ea) => 152 { 153 var body = ea.Body.ToArray(); 154 var message = Encoding.UTF8.GetString(body); 155 if (handlerMsg != null) 156 { 157 if (!string.IsNullOrEmpty(message)) 158 handlerMsg.Invoke(message); 159 } 160 else 161 Console.WriteLine($"訂閱到消息:{message}"); 162 }; 163 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); 164 } 165 #endregion 166 167 #region 發布訂閱的路由模式 Direct 168 /// <summary> 169 /// 發布 Direct 路由模式 Direct 170 /// </summary> 171 /// <param name="message"></param> 172 /// <param name="exchangeName"></param> 173 /// <param name="routerKey"></param> 174 public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent") 175 { 176 using (IConnection connection = connectionFactory.CreateConnection()) 177 { 178 using (IModel channelmodel = connection.CreateModel()) 179 { 180 channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 181 byte[] bymsg = Encoding.UTF8.GetBytes(message); 182 channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg); 183 184 // byte[] bytemsg = Encoding.UTF8.GetBytes(message); 185 // channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg); 186 } 187 } 188 } 189 190 public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null) 191 { 192 var connection = connectionFactory.CreateConnection(); 193 var channel = connection.CreateModel(); 194 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 195 var queueName = channel.QueueDeclare().QueueName; 196 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey); 197 198 Console.WriteLine("wating for message...!"); 199 var consumer = new EventingBasicConsumer(channel); 200 //(object sender, BasicDeliverEventArgs e) 201 consumer.Received += (sender, e) => 202 { 203 var bytedata = e.Body.ToArray(); 204 var getRoutekey = e.RoutingKey; 205 string msg = Encoding.UTF8.GetString(bytedata); 206 if (handler != null) 207 handler.Invoke(msg); 208 else 209 Console.WriteLine($"路由{getRoutekey},訂閱到消息{msg}!"); 210 }; 211 channel.BasicConsume(queue: queueName, autoAck: true, consumer); 212 } 213 #endregion 214 } 215 }