RabbitMQ .NET Client 實戰實驗


  由於公司業務需求,最近想上RabbitMQ,之前我研究了一段時間微軟的MSMQ。開源隊列有很多,各有優劣。就先拿RabbitMQ練練手吧。本篇着重代碼部分,至於怎么安裝,怎么配置不在贅述。而且代碼是在RabbitMQ.NET Client 類庫基礎上實現。

  假設閱讀本文的人已經安裝好RabbitMQ並且做了相應的用戶配置。而且項目中已經從nuget安裝了rabbitmq.client.dll.我們開始做一個簡單的隊列發送和接收消息。

  1. 將需要配置的東西放在配置文件里,例如主機地址,端口,用戶名,密碼等。
  2. 實現消息發送端:Product
  3. 實現消息接收端:Customer
  4. Demo測試

 

  將以下內容作為可配置部分放在配置文件中

<appSettings>
    <!--RabbitMQ-->
    <add key="RabbitMQ_HostUri" value="amqp://192.168.1.119:5672/"/>
    <add key="RabbitMQ_HostName" value="192.168.1.119"/>
    <add key="RabbitMQ_UserName" value="test_user"/>
    <add key="RabbitMQ_Password" value="123456"/>
    <add key="RabbitMQ_VirtualHost" value="ms_mq"/>
  </appSettings>

  由於只是對RabbitMQ.Client.dll中的又一次封裝,所以代碼不過多解釋,其中要注意的就是某些配置問題,例如是否持久化,消息處理模式是怎么樣的等等。

  首先我們創建一個連接工廠:

 public ConnectionFactory CreateFactory()
        {
            if (_factory == null) {

                const ushort heartbeat = 0;
                //主機地址
                Uri uri = new Uri(RabbitMQConfig.HostUri);

                _factory = new ConnectionFactory();
                //_factory.HostName = RabbitMQConfig.HostName;
                //用戶名
                _factory.UserName = RabbitMQConfig.UserName;
                //密碼
                _factory.Password = RabbitMQConfig.PassWord;
                //虛擬主機名
                _factory.VirtualHost = RabbitMQConfig.VirtualHost;
                //連接終端
                _factory.Endpoint = new AmqpTcpEndpoint(uri);

                _factory.RequestedHeartbeat = heartbeat;
                //自動重連
                _factory.AutomaticRecoveryEnabled = true;
            }
            return _factory;
        }

  一個簡單的消息發布:(對代碼研究不夠透徹,只能一切從簡~~)

 public void Publish(string message, string queueName=null)
        {
            if (queueName == null) {
                queueName = _queueName;
            }

            var factory = RabbitMQFactory.Instance.CreateFactory();
            using (var connection = factory.CreateConnection())
            {
                using (var model = connection.CreateModel())
                {
                    //消息持久化,防止丟失
                    model.QueueDeclare(queueName, RabbitMQConfig.IsDurable, false, false, null);
                    var properties = model.CreateBasicProperties();
                    properties.Persistent = RabbitMQConfig.IsDurable;
                    properties.DeliveryMode = 2;

                    //消息轉換為二進制
                    var msgBody = Encoding.UTF8.GetBytes(message);
                    //消息發出到隊列
                    model.BasicPublish("", queueName, properties, msgBody);
                }
            }
        }

  消息接收:

 public void Consume() {
            var factory = RabbitMQFactory.Instance.CreateFactory();

            var connection = factory.CreateConnection();

            connection.ConnectionShutdown += Connection_ConnectionShutdown;

            ListenChannel = connection.CreateModel();


            bool autoDeleteMessage = false;
            var queue = ListenChannel.QueueDeclare(_queueName, RabbitMQConfig.IsDurable, false, false, null);

            //公平分發,不要同一時間給一個工作者發送多於一個消息
            ListenChannel.BasicQos(0, 1, false);
            //創建事件驅動的消費者類型,不要用下邊的死循環來消費消息
            var consumer = new EventingBasicConsumer(ListenChannel);
            consumer.Received += Consumer_Received;
            //消費消息
            ListenChannel.BasicConsume(_queueName, autoDeleteMessage, consumer);
        }

  我在Customer中定義了一個 ReceiveMessageCallback Func回調,這里就是當客戶端從隊列接收到消息之后,怎么處理由客戶端來決定

  public Func<string, bool> ReceiveMessageCallback { get; set; }

  處理消息:

 private void Consumer_Received(object sender, BasicDeliverEventArgs args) {
            try {
                var body = args.Body;
                var message = Encoding.UTF8.GetString(body);
                //將消息業務處理交給外部業務
                bool result = ReceiveMessageCallback(message);
                if (result) {
                    if (ListenChannel != null && !ListenChannel.IsClosed) {
                        ListenChannel.BasicAck(args.DeliveryTag, false);
                    }
                }
                else {

                }

            }
            catch (Exception ex) {
                throw ex;
            }
        }

  基本代碼已經完成,我們寫一個測試,消息發送端:

    static void Main(string[] args)
        {
            var testQueueName = "test";
            IMessageProduct product = new MessageProduct(testQueueName);
            for (int i = 0; i < 10000; i++)
            {
                Console.WriteLine("正在發送第" + i + "條消息...");
                product.Publish("消息體" + i);
            }

            Console.Read();
        }

  消息接收端:(開多個口接收)

    static void Main(string[] args)
        {

            Parallel.For(0, RabbitMQConfig.ThreadCount, i =>
            {
                IMessageCustomer customer = new MessageCustomer("test");
         //開始監聽 customer.StartListening(); customer.ReceiveMessageCallback
= message => {
            //客戶端處理消息(打印) Console.WriteLine(
"接收到消息:" + message); return true; }; }); Console.Read(); }

 

  打開發送消息端:

  打開消息接收端:

  到此為止,RabbitMQ隊列的簡單測試就完成了,沒有介紹什么新知識,基本就是套DLL中的方法,不過也有很多不合理的地方,如果真正應用到項目中,還需要多加測試和修改。

  DEMO地址:https://github.com/fanpan26/RabbitMQ.NETClient


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM