1.RabbitMQHelper.cs

public class RabbitMQHelper { string exchangeName = "demoexchange"; string queueName = "demoqueue"; string exchangeType = ExchangeType.Direct; string routingKey = "demoqueue"; string userName = "test"; string password = "test"; string hostName = "127.0.0.1"; int port = 5672; string virtualHost = "vhost"; public delegate void MQMsgDelegate(string msg); public event MQMsgDelegate MQMsg; public delegate void MQErrorDeletegate(string error); public event MQErrorDeletegate MQError; /// <summary> /// 發布消息隊列 /// </summary> private Queue<string> ProducerQueue = new Queue<string>(); private object obj = new object(); /// <summary> /// 發布消息 /// </summary> /// <param name="msg"></param> public void SendMsg(string msg) { lock (obj) { ProducerQueue.Enqueue(msg); } } /// <summary> /// RabbitMQ /// </summary> /// <param name="exchangeName">消息交換機</param> /// <param name="queueName">消息隊列</param> /// <param name="exchangeType">交換器類型</param> /// <param name="routingKey">路由關鍵字</param> /// <param name="userName">用戶名</param> /// <param name="password">密碼</param> /// <param name="hostName">IP地址</param> /// <param name="port">端口</param> /// <param name="virtualHost">虛擬主機</param> public RabbitMQHelper(string exchangeName, string queueName, string exchangeType, string routingKey, string userName, string password, string hostName, int port, string virtualHost) { this.exchangeName = exchangeName; this.queueName = queueName; this.exchangeType = exchangeType; this.routingKey = routingKey; this.userName = userName; this.password = password; this.hostName = hostName; this.port = port; this.virtualHost = virtualHost; } /// <summary> /// 開始消費 /// </summary> public void Consumer() { try { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.HostName = hostName; factory.Port = port; factory.VirtualHost = virtualHost; //factory.AutomaticRecoveryEnabled = true; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //設置交換器的類型 channel.ExchangeDeclare(exchangeName, exchangeType); //聲明一個隊列,設置隊列是否持久化,排他性,與自動刪除 channel.QueueDeclare(queueName, false, false, false, null); //綁定消息隊列,交換器,routingkey channel.QueueBind(queueName, exchangeName, routingKey, null); //流量控制 channel.BasicQos(0, 2, false); while (true) { //消費數據 var consumer = new EventingBasicConsumer(channel); //false為手動應答,true為自動應答 channel.BasicConsume(queueName, false, consumer); consumer.Received += (ch, ea) => { var body = ea.Body.ToArray(); MQMsg(Encoding.UTF8.GetString(body)); //Console.WriteLine("已接收: {0}", Encoding.UTF8.GetString(body)); //手動應答時使用 channel.BasicAck(ea.DeliveryTag, false); }; string consumerTag = channel.BasicConsume(queueName, false, consumer); channel.BasicCancel(consumerTag); Thread.Sleep(1); } } } } catch (Exception ex) { MQError(ex.Message); Console.WriteLine(ex.Message); } } /// <summary> /// 開始發布 /// </summary> public void Producer() { try { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.HostName = hostName; factory.Port = port; factory.VirtualHost = virtualHost; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //設置交換器的類型 channel.ExchangeDeclare(exchangeName, exchangeType); //聲明一個隊列,設置隊列是否持久化,排他性,與自動刪除 channel.QueueDeclare(queueName, false, false, false, null); //綁定消息隊列,交換器,routingkey channel.QueueBind(queueName, exchangeName, routingKey, null); //消息特點 var properties = channel.CreateBasicProperties(); properties.ContentType = "text/plain"; properties.DeliveryMode = 2; while (true) { System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch(); watch.Start();//開始計時 Console.WriteLine("隊列內數據量:" + (ProducerQueue.Count));//輸出時間 毫秒 lock (obj) { if (ProducerQueue.Count > 0) { while (ProducerQueue.Count > 0) { var sendMsg = ProducerQueue.Dequeue(); //發送消息 byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(sendMsg); channel.BasicPublish(exchangeName, routingKey, properties, messageBodyBytes); //Console.WriteLine("寫入數據:" + sendMsg); //MQMsg(sendMsg +"待寫入:"+ ProducerQueue.Count); Thread.Sleep(1); } } } watch.Stop();//停止計時 Console.WriteLine("耗時:" + (watch.ElapsedMilliseconds));//輸出時間 毫秒 Thread.Sleep(1); } } } } catch (Exception ex) { MQError(ex.Message); Console.WriteLine(ex.Message); } } }
2.Producer.cs

class Program { private static System.Timers.Timer timer; static RabbitMQHelper helper; static void Main(string[] args) { //大概 400*3*7000/s 字節的寫入速度 var len = "10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。".Length; helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost"); helper.MQMsg += Helper_MQMsg; helper.MQError += Helper_MQError; timer = new System.Timers.Timer(1); timer.Elapsed += Timer_Elapsed; timer.Start(); helper.Producer(); } private static void Helper_MQMsg(string msg) { Console.WriteLine("已發送: {0}", msg); } private static void Helper_MQError(string error) { Console.WriteLine("錯誤信息: {0}", error); } private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { int i = 7; while (i > 0) { helper.SendMsg("10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。10日,中國人民銀行發布《關於開展大額現金管理試點的通知》。《通知》指出,該試點為期2年,先在河北省開展,再推廣至浙江省、廣東省深圳市。"); i--; } } }
3.Consumer.cs

class Program { static void Main(string[] args) { //Consumer(); RabbitMQHelper helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost"); helper.MQMsg += Helper_MQMsg; helper.MQError += Helper_MQError; helper.Consumer(); } private static void Helper_MQError(string error) { Console.WriteLine("錯誤信息: {0}", error); } private static void Helper_MQMsg(string msg) { Console.WriteLine("已接收: {0}", msg); } }