C#中使用RabbitMQ收發隊列消息


一、程序使用NetCore、引入Nuget:

  Install-Package RabbitMQ.Client -Version 4.1.3

二、消息發部端:

  

using RabbitMQ.Client;
using System;
using System.Text;

namespace ClientDemo
{
    public class Client
    {
        static string exchangeName = "my-exchange";
        static string queueName = "my-queue";
        public static void Main()
        {
            Console.InputEncoding = Encoding.Unicode;
            Console.OutputEncoding = Encoding.Unicode;
            ConnectionFactory factory = new ConnectionFactory();
            factory.Uri = new Uri("amqp://guest:guest@localhost:5672/");
            var conn = factory.CreateConnection();
            IModel model = conn.CreateModel();

            //model.ExchangeDelete(exchangeName);
            model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
            model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);

            var props = model.CreateBasicProperties();
            props.Persistent = true;//是否持久化
            while (true)
            {
                Console.WriteLine("請輸入要發送的消息:");
                var line = Console.ReadLine();
                if (line == "exit") break;
                model.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: Encoding.UTF8.GetBytes(line));

            }
            model.Close();
            conn.Close();
        }
    }
}

 

 二、消息消費端:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServerDemo
{
    public class Server
    {
        //static string exchangeName = "my-exchange";
        static string queueName = "my-queue";
        public static void Main()
        {
            Console.InputEncoding = Encoding.Unicode;
            Console.OutputEncoding = Encoding.Unicode;
            ConnectionFactory factory = new ConnectionFactory();
            //factory.Uri = new Uri("amqp://guest:guest@localhost:5672/");
            var conn = factory.CreateConnection();
            IModel model = conn.CreateModel();

            //model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
            //model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            //model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);
            var task = Task.Run(() =>
             {
                 while (true)
                 {
                     var result = model.BasicGet(queue: queueName, autoAck: false);
                     if (result == null) { Thread.Sleep(10);continue; };
                     var msg = Encoding.UTF8.GetString(result.Body);
                     Console.WriteLine(msg);
                 }
             });

            task.Wait();
            model.Close();
            conn.Close();
        }
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM