RabbitMQ學習總結


關於RabbitMQ是什么以及它的概念,不了解的可以先查看一下下面推薦的幾篇博客

https://blog.csdn.net/whoamiyang/article/details/54954780

https://www.cnblogs.com/frankyou/p/5283539.html

https://blog.csdn.net/mx472756841/article/details/50815895

官網介紹:http://www.rabbitmq.com/getstarted.html

本文github源碼:http://www.cnblogs.com/bluesummer/p/8992225.html

因為之前不了解交換機及AMQP協議,上來就研究RabbitMQ,很多概念都有點蒙圈,所以建議大家在學習RabbitMQ之前先對一些概念有基本的了解

安裝與配置:

服務相關命令

  • rabbitmq-plugins enable rabbitmq_management //開啟管理插件
  • rabbitmq-service.bat start //開啟服務
  • rabbitmq-service.bat stop //關閉服務
  • rabbitmqctl list_queues //查看任務

注意在執行命令rabbitmqctl list_queues時若報錯unable to perform an operation on node。。。。,可將C:\Users\用戶名\.erlang.cookie.erlang.cookie文件拷貝到C:\Windows\System32\config\systemprofile\.erlang.cookie中替換,然后重啟服務

至此RabbitMQ服務我們已經安裝好了

后台管理

開啟管理插件后我們重啟rabbitmq服務,打開http://localhost:15672/后台管理界面,
用戶名和密碼均為guest

guest賬戶在最新版本只能通過localhost登陸了,如果想要通過ip來登陸需要設置一下配置文件:

找到/rabbitmq_server-x.x.x/ebin下面的rabbit.app文件文件: 找到:loopback_users將里面的<<”guest”>>刪除。

刪除后的內容為:{loopback_users, []},然后重啟服務

關於用戶密碼管理的操作我們都可以在管理頁面中設置

默認端口:

  1. client端通信口5672
  2. 管理口15672
  3. server間內部通信口25672
  4. erlang發現口:4369

想要修改默認端口可修改 安裝目錄下 etc/rabbitmq.config文件,有個默認的example,改一改就可以了

發送消息

我們先構建一個應用程序,建議創建一個winform或wpf程序,控制台在這里並不太好用。
項目中引用nuget包:RabbitMQ.Client

接下來我們編寫一個發送消息和接收消息的代碼:

  public void SendMsg(string message)
    {
        //這里的端口及用戶名都是默認的,可以直接設置一個hostname=“localhost”其他的不用配置
        var factory = new ConnectionFactory() { HostName = "192.168.1.15",Port=5672,UserName= "guest",Password= "guest" };
        //創建一個連接,連接到服務器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                //創建一個名稱為hello的消息隊列
                //durable:隊列持久化,為了防止RabbitMQ在退出或者crash等異常情況下數據不會丟失,可以設置durable為true
                //exclusive:排他隊列,只對首次聲明它的連接(Connection)可見,不允許其他連接訪問,在連接斷開的時候自動刪除,無論是否設置了持久化
                //autoDelete:自動刪除,如果該隊列已經沒有消費者時,該隊列會被自動刪除。這種隊列適用於臨時隊列。
                channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null);
                //channel.BasicConsume("hello", autoAck: true);
               
                var props = channel.CreateBasicProperties();
                //消息持久化,若啟用durable則該屬性啟用
                props.Persistent = true;
                //封裝消息主體
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: props, body: body);
                Console.WriteLine(" 發送消息{0}", message);
            }
        }
    }
	
	 public class Consumer : IDisposable
    {
        public static int _number;
        private static ConnectionFactory factory;
        private static IConnection connection;
        static Receive()
        {
            factory = new ConnectionFactory() { HostName = "localhost" };
        }
        public Receive()
        {
            _number++;
        }
        public void ReceiveMsg(Action<string> callback)
        {
            if(connection==null||!connection.IsOpen)
                connection = factory.CreateConnection();
            IModel _channel = connection.CreateModel();
            _channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null);
            _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
            // 創建事件驅動的消費者
            var consumer = new EventingBasicConsumer(_channel); 
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                callback($"number:{_number}.message:{message}");
                //模擬消息處理需要兩秒
                Thread.Sleep(2000);
                //顯示發送ack確認接收並處理完成消息,只有在前面進行啟用顯示發送ack機制后才奏效。
                _channel.BasicAck(ea.DeliveryTag, false);
            };
            //指定消費隊列,autoAct是否自動確認
            string result = _channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

            //設置后當所有的channel都關閉了連接會自動關閉
            //connection.AutoClose = true;
        }
        public void Dispose()
        {
            if (connection != null && connection.IsOpen)
                connection.Dispose();
        }
    }

上面一個很簡單的消息隊列的發送者和消費者,解釋下基本的流程:

Publisher中調用send函數先創建一個連接到服務器,然后用該連接創建了一個channel,接着用該channel聲明了一個hello的隊列,最后向默認的交換機發送了一條消息。(exchange: "") 空字符串即為默認的交換機 ,消息的路由為hello ,默認的交換機是direct類型,根據路由名稱完全匹配隊列的名稱。所有的隊列都會綁定到默認的交換機上,路由名稱就是隊列的名稱。所以默認的交換機將消息發送到名聲為hello 的隊列。緊接着Consumer中調用ReceiveMsg 函數從hello 隊列獲取消息,獲取到消息后調用act函數通知broker該消息已經被成功地消費,broker將這條消息刪除,如下圖

網上有部分示例是使用QueueingBasicConsumer來創建消費者的,我發現在新版本中已經過時了,原因是它容易造成內存溢出性能降低等一系列的問題,簡單說一下QueueingBasicConsumer的處理流程,它接收到消息之后會把消息塞到一個Queue隊列中,然后用戶來循環這個隊列處理消息,但是如果你一個消息處理的很慢,而消息又發送過來的很快很大,就會造成隊列里面存的消息越來越多,最終造成內存溢出。所以現在推薦使用EventingBasicConsumer或者繼承DefaultBasicConsumer來創建消費者,事件驅動就不會有這個問題了

上面的代碼需要注意以下幾點:

  1. 想要通過guest賬戶指定ip連接需要修改loopback_users配置
  2. 我們調用QueueDeclare函數聲明一個隊列,如果設置了隊列持久化,即使重啟服務隊列仍然在。如果不是持久化,即使消息全都被消費了,只要服務沒有重啟,隊列仍然存在。RabbitMQ不允許你使用不同的參數重新定義一個已經存在的隊列,所以要么刪除隊列要么重新命名一個隊列,刪除隊列可以通過管理界面來刪除或者調用QueueDelete函數。
  3. 隊列如果存在聲明一次就夠了,如果多次聲明了一樣的隊列將不會有任何異常,但是如果消費者綁定了一個不存在的隊列是會發生異常的:_channel.BasicConsume,所以習慣是在Woker中將需要監聽的隊列先聲明一遍
  4. 排他隊列:大概意思就是通過連接connectionA聲明一個排他隊列之后,以后也只能通過連接connectionA來訪問該隊列,其他連接一旦訪問就會報隊列被鎖定的錯誤,這個實在想不到應用場景
  5. 隊列持久化代表的是重啟服務后隊列仍然在,想要隊列里的消息仍然存在需要同時設置消息持久化,但是如果只設置消息持久化不設置隊列持久話也沒有意義。但這也並不一定能保證消息一定不會丟失。首先必須要有消息確認機制來保證消息一定被正確消費了。最主要的問題是消息寫入到磁盤需要一定的時間,如果服務接收到消息沒有來得及寫入磁盤就掛掉了,那么這個消息就丟失了,對於這一點可以查詢一下RabbitMQ集群相關的文章
  6. 默認發送的消息都需要消費者確認,可以通過設置autoAct為true來自動確認消息,也可以調用BasicAck函數確認,總之如果消息需要確認,一定要在消息處理完成之后進行確認。如果當前消費者未確認的消息達到了perfetchCount的數量時,該消費者便無法再接收新的消息。 當消費者連接關閉之后未被確認的消息很快就會被退回。
  7. 可以通過BasicNack()函數將消息重新塞回隊列,如果消息未確認消費者斷開鏈接,消息也會退回。需要注意的是:如果不是因為程序異常而僅僅是因為業務邏輯上的錯誤,則不應該手動退回消息,否則退回的消息永遠也無法被消費掉
  8. 我上面定義的消費者原本是想要多次實例化Receive來模擬多個消費者的,然而事實證明並不好用,想要模擬多個消費者還是需要打開多個程序
  9. EventingBasicConsumer的監聽會創建一個前台線程一直在運行,所以在winform中如果關閉程序需要dispose掉connection占用的線程

輪詢調度

輪詢調度就是同時運行多個消費者,當任務數量很多的時候RabbitMQ會將消息分發給不同的消費者(Worker)來減輕壓力,想要讓RabbitMQ公平的分發任務,需要在worker中用以下代碼來設置一個worker的最大未確認消息數量

channel.BasicQos(0, 1, false);

BasicQos方法接收三個參數:

prefetchSize:消費者接收消息的長度,如果長度在小於等於設定值,則接收,如果設置0,則不限消息長度

prefetchCount:消費者可同時緩存的最大消息數量,假設數值設為2,那么隊列會向該woker推送兩條消息,直到該Woker處理了該消息(處理指的是Act或者nack),隊列才會再次向該woker推送新的消息。

上面的勢力中,參數prefetchCount=1就代表此Worker同時只會處理一條消息,如果當前的消息沒有處理完畢(沒有act),rabbitmq就會把剩下的任務發送給其他的worker,如果所有的worker都很忙,消息久會在隊列中排隊等待

綁定

上面的一個示例中我們用的是默認的交換機發送消息,我們可以通過給exchange賦值來使用指定的交換機,通過QueueBind將交換機與隊列進行綁定

_channel.QueueBind("log1", "logs", "info");

聲明一個交換機的代碼如下

_channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false);

我們將隊列log1綁定到了交換機:logs上,路由為info,交換機的類型為Direct,Direct代表的是路由完全匹配,現在我們向logs交換機發送一條消息,路由為info,隊列log1就會接收到消息了

channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body);

隊列和交換機的關系是多對多的,交換機的類型常用的有三個:Direct,Fanout,Topic,Headers

Direct:要求路由鍵完全匹配

Fanout:忽略路由鍵,給所有綁定到交換機上的隊列都發送消息

Topic:模糊匹配,通過字母配合符號“*”和“#”來設置路由鍵

Headers:Headers類型用的比較少,它也忽略路由鍵,而是匹配交換機的headers,headers為鍵值對的hashtable,對publisher和consumer兩邊設置的header進行匹配,需要指定匹配的方式是 all還是any,具體代碼可看github

下面展示了一個使用direct類型交換機的相關代碼

public class LogDirectPub
{
    public void SendMsg(string message)
    {
        var factory = new ConnectionFactory() { HostName = "192.168.1.15", Port = 5672, UserName = "guest", Password = "guest" };
        //創建一個連接,連接到服務器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var props = channel.CreateBasicProperties();
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body);
                channel.BasicPublish(exchange: "logs", routingKey: "error", basicProperties: props, body: body);
                Console.WriteLine("發送消息{0}", message);
            }
        }
    }
}


public class LogDirectConsumer : IDisposable
{
    private static ConnectionFactory factory;
    private static IConnection connection;
    static LogDirectConsumer()
    {
        factory = new ConnectionFactory() { HostName = "localhost" };
    }
    public void ReceiveMsg(Action<string> callback)
    {
        if (connection == null || !connection.IsOpen)
            connection = factory.CreateConnection();
        IModel _channel = connection.CreateModel();
        _channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false);
        _channel.QueueDeclare(queue: "log1", durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind("log1", "logs", "info");
        _channel.QueueBind("log1", "logs", "error");
        _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            callback($"log1Write.message:{ea.RoutingKey}:{message}");
            //模擬消息處理需要兩秒
            Thread.Sleep(2000);
            _channel.BasicAck(ea.DeliveryTag, false);
        };
        string result = _channel.BasicConsume(queue: "log1", autoAck: false, consumer: consumer);
    }
    public void Dispose()
    {
        if (connection != null && connection.IsOpen)
            connection.Dispose();
    }
}

RabbitMQ Management HTTP API

RabbitMQ有一套自己的http/api,地址為http://192.168.1.15:15672/api,可以查詢你想查的所有信息配置,通過這些api,我們可以自己實現RabbitMQ的監控管理,英文看的頭痛,這里有一篇中文的翻譯文檔:http://www.blogjava.net/qbna350816/archive/2016/08/13/431575.html

這是一個獲取所有隊列的簡單示例:

	string username = "guest";
    string password = "guest";
    string queuesUrl = "http://localhost:15672/api/queues";
    /// <summary>
    /// 查詢所有隊列
    /// </summary>
    /// <returns></returns>
    public string GetAllQuenes()
    {
        string jsonContent = GetApiResult(queuesUrl).Result;
        List<QueueModel> queues = JsonConvert.DeserializeObject<List<QueueModel>>(jsonContent);
        return JsonConvert.SerializeObject(queues);
    }

    private async Task<string> GetApiResult(string Url)
    {
        var client = new HttpClient();
        var passByte = Encoding.UTF8.GetBytes(string.Format("{0}:{1}", username, password));
        client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(passByte));
        using (HttpResponseMessage response = await client.GetAsync(Url).ConfigureAwait(false))
        {
            string result = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
            return result;
        }
    }

自定義Consumer

之前說過用QueueingBasicConsumer會有性能問題,但是eventconsumer無法阻塞線程,對於某些需要阻塞線程的功能用起來不太方便,這時我們就可以自定義一個Consumer繼承DefaultBasicConsumer,只需要實現其中的HandleBasicDeliver函數就可以了,下面是我定義的一個consumer,用來實現后面的Rpc客戶端

public class QueueingConsumer : DefaultBasicConsumer
{
    private IModel _channel;
    private BasicDeliverEventArgs args = new BasicDeliverEventArgs();

    private AutoResetEvent argResetEvent = new AutoResetEvent(false);
    public QueueingConsumer(IModel channel)
    {
        _channel = channel;
    }
    public override void HandleBasicDeliver(string consumerTag,
       ulong deliveryTag,
       bool redelivered,
       string exchange,
       string routingKey,
       IBasicProperties properties,
       byte[] body)
    {
        args = new BasicDeliverEventArgs
        {
            ConsumerTag = consumerTag,
            DeliveryTag = deliveryTag,
            Redelivered = redelivered,
            Exchange = exchange,
            RoutingKey = routingKey,
            BasicProperties = properties,
            Body = body
        };
        argResetEvent.Set();
    }

    public void GetResult(Action<BasicDeliverEventArgs> callback)
    {
        argResetEvent.WaitOne();
        callback(args);
    }

}

RPC實現

Rpc是什么不用多說了,反正我也就知道他是遠程過程調用嘛。用RabbitMQ來實現Rpc,官網有一篇簡單的示例,但個人感覺RabbitMQ並不太適合做Rpc。不過用這個示例作為對RabbitMQ的一個學習成果實踐還是蠻不錯的,下面請看代碼:

public class RpcPub
{
    public async Task<string> SendMsg(string message)
    {
        ConnectionFactory factory = RabbitMQHelper.ConFactory;
        //創建一個連接,連接到服務器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                //定義一個臨時的隊列,用來接收返回的消息
                string replyQueueName = channel.QueueDeclare().QueueName;
                var consumer = new QueueingConsumer(channel);
                //監聽該臨時隊列,自動act消息
                channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);


                string corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                //定義ReplyTo讓服務端知道返回消息給哪個路由
                props.ReplyTo = replyQueueName;
                //定義CorrelationId作為消息的唯一關聯ID
                props.CorrelationId = corrId;

                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
                Task<string> result = new Task<string>(() =>
                {
                    while (true)
                    {
                        string replystr = string.Empty;
                        consumer.GetResult((args) =>
                        {
                            if (args.BasicProperties.CorrelationId == corrId)
                            {
                                replystr = Encoding.UTF8.GetString(args.Body);
                            }
                        });
                        if (replystr != string.Empty)
                            return replystr;
                    }
                });
                result.Start();
                return await result;
            }
        }
    }
}


public class RpcConsumer : IDisposable
{

    private ConnectionFactory factory = RabbitMQHelper.ConFactory;
    private IConnection connection;


    public void ReceiveMsg(Action<string> callback)
    {
        if (connection == null || !connection.IsOpen)
            connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();

        channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
        //channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, arg) =>
        {
            var props = arg.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;
            callback($"接收到消息:{Encoding.UTF8.GetString(arg.Body)}");
            var responseBytes = Encoding.UTF8.GetBytes($"成功接收你的消息:{ Encoding.UTF8.GetString(arg.Body)}");
            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
            channel.BasicAck(deliveryTag: arg.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

    }
    public void Dispose()
    {
        if (connection != null && connection.IsOpen)
            connection.Dispose();
    }

}

基本流程:

  1. 當客戶端發送消息之前,創建一個匿名的回調隊列channel.QueueDeclare(),並監聽該隊列。
  2. 客戶端獲取匿名隊列的名稱,在請求中設置2個屬性:replyTo=回調隊列名稱;CorrelationId=請求關聯的唯一id
  3. 客戶端發送請求到rpc_queue隊列中。
  4. RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理消息,返回結果發送到replyTo指定的隊列,在請求中設置1個屬性:CorrelationId=請求過來的CorrelationId
  5. 客戶端監聽的隊列收到消息,檢查correlationId是否與之前生成的匹配,匹配成功返回結果。
  6. 對於為什么要驗證correlationId這一項,有兩個原因,1.消息可能並不是rpc服務器發送的 2.rpc服務如果在某個階段突然掛掉,可能會發送一個不包含correlationId的消息

publish confirm

在消費端可通過消息確認機制來保證隊列的正常消費 ,在服務端可通過數據持久化到磁盤保證數據的不丟失 ,發送端同樣可以使用publish confrim機制來保證數據的正確發送

confirm有普通模式WaitForConfirms和批量模式WaitForConfirmsOrDie

具體流程為:標記該消息需要confirm,發送消息,等待confirm結果。一個保證消息可靠性的相關代碼體現為:

var props = channel.CreateBasicProperties();
props.Persistent = true;
var body = Encoding.UTF8.GetBytes("hi");
channel.ConfirmSelect();
channel.BasicPublish(exchange: "ali", routingKey: "ali.point", mandatory: true, basicProperties: props, body: body);
//獲取rabbitmq服務返回的消息
channel.BasicReturn += Channel_BasicReturn;
try
{
    bool pubAct = channel.WaitForConfirms();
    if (!pubAct)
        Console.WriteLine("消息發送失敗");
}
catch (Exception)
{
    Console.WriteLine("消息發送失敗");
}

用了confirm機制之后,發送一條消息會遇到以下幾種情況:

1.消息成功發送到交換機,成功匹配到隊列,pubAct=true
2.消息成功發送到交換機,沒有隊列綁定該路由,pubAct=true。 此時如果設置mandatory=true,則會觸發BasicReturn事件,通知路由未匹配到任何隊列,如果mandatory=false,消息直接拋棄
3.交換機名稱未定義,或消息發送失敗,拋出異常
4.消息成功發送到交換機之后,尚未持久化到磁盤,pubAct=false (尚未驗證,消息確認失敗的情況不太容易模擬,所以這條結論不一定准確)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM