C#隊列學習筆記:RabbitMQ使用多線程提高消費吞吐率


    一、引言

    使用工作隊列的一個好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。本例使用多線程來創建多信道並綁定隊列,達到多workers的目的。

    二、示例

    2.1、環境准備

    在NuGet上安裝RabbitMQ.Client。

    2.2、工廠類

    添加一個工廠類RabbitMQFactory:

    /// <summary>
    /// 多路復用技術(Multiplexing)目的:為了避免創建多個TCP而造成系統資源的浪費和超載,從而有效地利用TCP連接。
    /// </summary>
    public static class RabbitMQFactory
    {
        private static IConnection sharedConnection;
        private static int ChannelCount { get; set; }
        private static readonly object _locker = new object();

        public static IConnection SharedConnection
        {
            get
            {
                if (ChannelCount >= 1000)
                {
                    if (sharedConnection != null && sharedConnection.IsOpen)
                    {
                        sharedConnection.Close();
                    }
                    sharedConnection = null;
                    ChannelCount = 0;
                }
                if (sharedConnection == null)
                {
                    lock (_locker)
                    {
                        if (sharedConnection == null)
                        {
                            sharedConnection = GetConnection();
                            ChannelCount++;
                        }
                    }
                }
                return sharedConnection;
            }
        }

        private static IConnection GetConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用默認值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            return factory.CreateConnection();
        }
    }
View Code

    2.3、主窗體

    代碼如下:

    public partial class RabbitMQMultithreading : Form
    {
        public delegate void ListViewDelegate<T>(T obj);

        public RabbitMQMultithreading()
        {
            InitializeComponent();
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="msg"></param>
        private void ShowMessage(string msg)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg);
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// ShowMessage重載
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private void ShowMessage(string format, params object[] args)
        {
            if (InvokeRequired)
            {
                BeginInvoke(new MethodInvoker(delegate ()
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                    lvwMsg.Items.Insert(0, item);
                }));
            }
            else
            {
                ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) });
                lvwMsg.Items.Insert(0, item);
            }
        }

        /// <summary>
        /// 生產者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnSend_Click(object sender, EventArgs e)
        {
            int messageCount = 100;
            var factory = new ConnectionFactory
            {
                HostName = "192.168.2.242",
                UserName = "hello",
                Password = "world",
                Port = AmqpTcpEndpoint.UseDefaultPort,//5672
                VirtualHost = ConnectionFactory.DefaultVHost,//使用默認值:"/"
                Protocol = Protocols.DefaultProtocol,
                AutomaticRecoveryEnabled = true
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);
                    for (int i = 1; i <= messageCount; i++)
                    {
                        channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                        ShowMessage($"Send {message}");
                    }
                }
            }
        }

        /// <summary>
        /// 消費者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnReceive_Click(object sender, EventArgs e)
        {
            Random random = new Random();
            int rallyNumber = random.Next(1, 1000);
            int channelCount = 0;

            await Task.Run(() =>
            {
                try
                {
                    int asyncCount = 10;
                    List<Task<bool>> tasks = new List<Task<bool>>();
                    var connection = RabbitMQFactory.SharedConnection;
                    for (int i = 1; i <= asyncCount; i++)
                    {
                        tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber)));
                    }
                    Task.WaitAll(tasks.ToArray());

                    string syncResultMsg = $"集結號 {rallyNumber} 已吹起號角--" +
                        $"本次開啟信道成功數:{tasks.Count(s => s.Result == true)}," +
                        $"本次開啟信道失敗數:{tasks.Count() - tasks.Count(s => s.Result == true)}" +
                        $"累計開啟信道成功數:{channelCount + tasks.Count(s => s.Result == true)}";
                    ShowMessage(syncResultMsg);
                }
                catch (Exception ex)
                {
                    ShowMessage($"集結號 {rallyNumber} 消費異常:{ex.Message}");
                }
            });
        }

        /// <summary>
        /// 異步方法
        /// </summary>
        /// <param name="state"></param>
        /// <param name="rallyNumber"></param>
        /// <returns></returns>
        private bool MessageWorkItemCallback(object state, int rallyNumber)
        {
            bool syncResult = false;
            IModel channel = null;
            try
            {
                IConnection connection = state as IConnection;
                //不能使用using (channel = connection.CreateModel())來創建信道,讓RabbitMQ自動回收channel。
                channel = connection.CreateModel();
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body);
                    Thread.Sleep(1000);
                    ShowMessage($"集結號 {rallyNumber} Received {message}");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
                syncResult = true;
            }
            catch (Exception ex)
            {
                syncResult = false;
                ShowMessage(ex.Message);
            }
            return syncResult;
        }
    }
View Code

    2.4、運行結果

    多點幾次消費者即可增加信道,提升消費能力。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM