RabbitMq入門詳解


     因為項目中需要用到RabbitMq,所有花時間研究了下,雖然博客園已經有前輩寫了關於RabbitMq的文章。但還是有必要研究下!

 

什么是RabbitMq?

   百度解釋:MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等。https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin

 

安裝RabbitMq

RabbitMq 官網:http://www.rabbitmq.com/

ErLang:http://www.erlang.org/download.html

RabbitMQ:http://www.rabbitmq.com/download.html

 

安裝后,開啟Rabbit MQ管理后台

打開RabbitMQ Server的開始菜單安裝目錄

選擇RabbitMQ Command Prompt 命令行並打開,輸入

rabbitmq-plugins enable rabbitmq_management

 

 

 

然后重啟服務:

 

 

 

然后在瀏覽器輸入:http://localhost:15672

默認用戶名和密碼:guest

 

Overview:概述,

Connection:連接

Channels:消息通道,在客戶端的每個連接里,可建立多個channel.

Exchanges:消息交換機,它指定消息按什么規則,路由到哪個隊列

Queues:隊列,消息的載體,每個消息都會被投到一個或多個隊列。

Admin:管理

 

幾個概念說明:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。
Producer:消息生產者,就是投遞消息的程序.
Consumer:消息消費者,就是接受消息的程序.

 

 

現在開始編寫發送消息代碼:

ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.0.39";
            //factory.Port = 5672;
            factory.UserName = "zhangsan";
            factory.Password = "123";
            factory.AutomaticRecoveryEnabled = true;//自動連接
            //factory.VirtualHost = "test2";
            //factory.RequestedHeartbeat = 60; //心跳超時時間

            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel()) //創建一個通道
                {
                    //在MQ上定義一個持久化隊列,該隊列不存在時,才會創建,聲明隊列是一次性的
                    //channel.QueueDeclare("myMq", true, false, false, null);
                  var qu =  channel.QueueDeclare(queue: "myMq", //隊列名稱
                                    durable: true, //是否是持久化
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null); //參數

            uint count = qu.MessageCount; //獲取消息隊列數量
                   string name = qu.QueueName; //消息隊列名稱
          //設置消息持久化 
          IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //2 消息持久化 1 非持久化

        //properties.SetPersistent(true); //跟上面同理

        var post = new { name = "lishi", age = 201 };
      
string json = JsonConvert.SerializeObject(post); byte[] bytes = Encoding.UTF8.GetBytes(json); //發布消息
      channel.BasicPublish("", "myMq", properties, bytes); Console.WriteLine("消息發送成功" + json); } }


訂閱消息者,或者加接收,消費者

ConnectionFactory factory = new ConnectionFactory
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "guest",
                Password = "guest"
            };

            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel model = conn.CreateModel())
                {
                    //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建
                   var qu =  model.QueueDeclare("myMq", true, false, false, null);
  uint count = qu.MessageCount; //獲取消息隊列數量
                   string name = qu.QueueName; //消息隊列名稱
//輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 model.BasicQos(0, 1, false); Console.WriteLine("Listering....."); //在隊列上定義一個消費者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(model); //消費隊列,並設置應答模式為程序主動應答,意思是等我回復你。你再刪除 // //noAck設置false,告訴broker,發送消息之后,消息暫時不要刪除,等消費者處理完成再說 model.BasicConsume("myMq", false, consumer); while (true) { //阻塞函數,獲取隊列中的消息 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; string str = Encoding.UTF8.GetString(bytes); var json = JsonConvert.DeserializeObject<dynamic>(str); Console.WriteLine("我叫" + json.name + "我今年" + json.age); //回復確認,告訴服務器,我處理完了。你可以刪除了 model.BasicAck(ea.DeliveryTag, false); } } }

 

在運行程序之前。我們看看。隊列是沒有的

 

 

 

當運行發送消息客戶端

 

此時,隊列已經存在,消息已經存在隊列里面了

 

 

 

此時還沒有消費者連接來消費

 

我們可以先手動去看看消息。單擊隊列名稱。到隊列詳細界面,可以獲取信息。

 

 

 

 

當然。你手動消費了。自然就沒有了

 

現在運行消息客戶端,2個

 

 

管理界面顯示:

 

通道界面:

 

然后發送消息:

 

額。奇怪。怎么只有一個消費者受到消息呢?這里默認是Direct類型

這是由Exchange 類型 類型決定的

 

1.Direct exchange
Direct exchange 完全根據 key 進行投遞,只有 key 與綁定時的 routing key 完全一致的消息才會收到消息,參考官網提供的圖 4 更直觀地了解 Direct exchange。

 

 

2.Fanount exchange
Fanount 完全不關心 key,直接采取廣播的方式進行消息投遞,與該交換機綁定的所有隊列都會收到消息

 

 

.Topic exchange
Topic exchange 會根據 key 進行模式匹配然后進行投遞,與設置的 routing key 匹配上的隊列才能收到消息。

4.Headers exchange
Header exchange 使用消息頭代替 routing key 作為關鍵字進行路由,不過在實際應用過程中這種類型的 exchange 使用較少。


用fanout方式發布消息:
ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.0.39";
            //factory.Port = 5672;
            factory.UserName = "zhangsan";
            factory.Password = "123";

            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel()) //創建一個通道
                {
                    /*
                     * 交換機類型
                      fanout(扇出):傳播它收到的所有消息去它知道所有的隊列中,會給所有在線的消費者發送所有信息,可以忽略:routingKey值。即隊列名稱
                     *direct(直接):把消息發送到指定的交換機下的指定的隊列中
                     */
                    //channel.ExchangeDelete("message",true);
                    //不能改變已經存在交換機的類型 比如: channel.ExchangeDeclare("message", "direct")
                    channel.ExchangeDeclare("message", "fanout"); //type要小寫
                    //在MQ上定義一個持久化隊列,該隊列不存在時,才會創建,聲明隊列是一次性的,這里不需要申明隊列了。因為定義了交換機
                    //channel.QueueDeclare(queue: "myMq", //隊列名稱
                    //                durable: true, //是否是持久化
                    //                exclusive: false, //是否排除其他,即存在則不會在創建
                    //                autoDelete: false, //是否自動刪除
                    //                arguments: null); //參數
                    //設置消息持久化
                    //IBasicProperties properties = channel.CreateBasicProperties();
                    //properties.DeliveryMode = 2; //2 消息持久化 1 非持久化
                    //properties.SetPersistent(true); //跟上面同理 
                    var post = new
                    {
                        name = "交換機",
                        age = 20
                    };

                    string json = JsonConvert.SerializeObject(post);
                    byte[] bytes = Encoding.UTF8.GetBytes(json);


                    //發布消息
                    channel.BasicPublish("message", "", null, bytes);
                    Console.WriteLine("消息發送成功" + json);
                }
            }

 

 

接收代碼:

 ConnectionFactory factory = new ConnectionFactory
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "guest",
                Password = "guest",
                //VirtualHost="test2"
            };

            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {

                    var name = channel.QueueDeclare().QueueName;

                    //如果該交換機不存在。則會報錯
                    channel.QueueBind(name, "message", "");  //把交換機和隊列綁定,沒有指定隊列名稱,所以是所有隊列

                    //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建
                    //channel.QueueDeclare(name, true, false, false, null);
                    //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息
                    channel.BasicQos(0, 1, false);
                    Console.WriteLine("Listering.....");

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(name, false, consumer);

                    consumer.Received += (m, ea) =>
                    {
                        var body = ea.Body;
                        string str = Encoding.UTF8.GetString(body);
                        var json = JsonConvert.DeserializeObject<dynamic>(str);
                        Console.WriteLine("我叫" + json.name + "我今年" + json.age);
                        //處理完成,告訴Broker可以服務端可以刪除消息,分配新的消息過來
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    Console.Read();
                }

            }

運行結果

 
        

 

 

接收消息一端是需要一直運行的接收消息的,不建議用

while(true){} 一個死循環建議用一個多線程

Task.Factory.StartNew(Method);

 不過用了多線程,那么,就不能用using語句

比如:

 

   ConnectionFactory factory = new ConnectionFactory
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "guest",
                Password = "guest",
                //VirtualHost="test2"
            };

            IConnection conn = factory.CreateConnection();

            IModel channel = conn.CreateModel();

 

 

VirtualHost虛擬機

接收消息和發送消息,必須在同一個虛擬機。才能收到消息。默認虛擬機是   "/"  可以通過factory.VirtualHost = "test2"; 設置 

當然。以上代碼是可以封裝的。比如:

 

public class RabbitMqConsumerFactory
    {
       public static IConnection CreateConnection()
       {
           var factory = new ConnectionFactory()
           {
               HostName = ConfigurationManager.AppSettings["MqHostName"],
               UserName = ConfigurationManager.AppSettings["MqUserName"],
               Password = ConfigurationManager.AppSettings["MqPassword"],
               Port = int.Parse(ConfigurationManager.AppSettings["MqPort"].ToString()),
               RequestedHeartbeat = 60, //心跳超時時間
               AutomaticRecoveryEnabled = true //自動重連
           };

           return factory.CreateConnection(); //創建連接對象
       }
    }

調用: using (IConnection conn = RabbitMqConsumerFactory.CreateConnection()){}

 

這僅僅是在控制台應用程序中使用,那么你 有沒有想過在Web中使用,

借助於 RabbitMQ 的 Web STOMP 插件,實現瀏覽器與服務端的全雙工通信。

從本質上說,RabbitMQ 的 Web STOMP 插件也是利用 WebSocket 對 STOMP 協議進行了一次橋接,從而實現瀏覽器與服務端的雙向通信。官網有個列子

例子:http://www.rabbitmq.com/web-stomp.html

例子下載:https://github.com/rabbitmq/rabbitmq-web-stomp-examples

 

我在官網的列子上研究了下

 

先啟用STOMP插件  rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp

重啟服務:service rabbitmq-server restart

開通后的插件,可以在web頁面中overview中查看

 

下載官網的列子后,里面有個stomp.js

 

以下是主要js代碼

接收消息

 <script>
        //畫圖 start
        var draw;
        send = draw = function () { };

        var lines = [];

        var canvas = document.getElementById('cnvs');

        if (canvas.getContext) {
            var ctx = canvas.getContext('2d');

            var img = new Image();
            img.onload = function () {
                ctx.drawImage(img, 230, 160);
            };
            //bunny
            img.src = 'bunny.png';

            draw = function (p) {
                ctx.beginPath();
                ctx.moveTo(p.x1, p.y1);
                ctx.lineTo(p.x2, p.y2);
                ctx.stroke();
                ctx.drawImage(img, 230, 160);
            };

            var do_resize = function () {
                canvas.width = window.innerWidth;
                canvas.height = window.innerHeight;

                ctx.font = "bold 20px sans-serif";
                ctx.fillStyle = "#444";
                ctx.fillText("Draw wings on the bunny!", 260, 100);
                ctx.font = "normal 16px sans-serif";
                ctx.fillStyle = "#888";
                ctx.fillText("(For more fun open a second browser)", 255, 130);

                ctx.drawImage(img, 230, 160);

                ctx.strokeStyle = "#fa0";
                ctx.lineWidth = "10";
                ctx.lineCap = "round";

                $.map(lines, function (p) {
                    draw(p);
                });
            };

            $(window).resize(do_resize);
            $(do_resize);


            var pos = $('#cnvs').position();
            var prev = null;
            $('#cnvs').mousedown(function (evt) {
                evt.preventDefault();
                evt.stopPropagation();
                $('#cnvs').bind('mousemove', function (e) {
                    var curr = { x: e.pageX - pos.left, y: e.pageY - pos.top };
                    if (!prev) {
                        prev = curr;
                        return;
                    }
                    if (Math.sqrt(Math.pow(prev.x - curr.x, 2) +
                                  Math.pow(prev.y - curr.y, 2)) > 8) {
                        var p = { x1: prev.x, y1: prev.y, x2: curr.x, y2: curr.y }
                        lines.push(p);
                        draw(p);
                        send(JSON.stringify(p));
                        prev = curr;
                    }
                });
            });
            $('html').mouseup(function () {
                prev = null;
                $('#cnvs').unbind('mousemove');
            });
        }
        else {
            document.write("Sorry - this demo requires a browser with canvas tag support.");
        }

        //畫圖 end

        var has_had_focus = false;
        var pipe = function (el_name, send) {
            var div = $(el_name + ' div');
            var inp = $(el_name + ' input');
            var form = $(el_name + ' form');

            var print = function (m, p) {
                p = (p === undefined) ? '' : JSON.stringify(p);
                div.append($("<code>").text(m + ' ' + p));
                div.scrollTop(div.scrollTop() + 10000);
            };

            if (send) {
                form.submit(function () {
                    send(inp.val());
                    inp.val('');
                    return false;
                });
            }
            return print;
        };

        // Stomp.js boilerplate
        var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws');
        client.debug = pipe('#second'); //顯示log

        var print_first = pipe('#first', function (data) {
            //發送消息 myMq
            client.send('test1', { "content-type": "text/plain" }, data);
            //client.send('message', { "content-type": "text/plain" }, data);
            //client.send('/topic/test', { "content-type": "text/plain" }, data);
        });

        //建立連接后,訂閱消息
        var on_connect = function (x) {

            //獲取消息(訂閱)testTomp
            /*
            訂閱改隊列的消息,如果沒有該隊列。會創建一個/topic/
            */
            id = client.subscribe("/topic/test1", function (d) {
                //d.body 是接收到的消息
                print_first(d.body); //輸出到界面

                var p = JSON.parse(d.body);
                lines.push(p);
                draw(p, true);
            });
        };
        var on_error = function () {
            console.log('error');
        };

        //  "/"代表 VirtualHost
        client.connect('zhangsan', '123', on_connect, on_error, '/');
        client.onreceive = function (m) {
            alert(m.body);
            //$('#first div').append($("<code>").text(m.body));
        }
        $('#first input').focus(function () {
            if (!has_had_focus) {
                has_had_focus = true;
                $(this).val("");
            }
        });
    </script>

 

發送消息:

        var ws = new WebSocket('ws://' + window.location.hostname + ':15674/ws');
        //建立連接
        var client = Stomp.over(ws);

        //client.heartbeat.outgoing = 0;
        //client.heartbeat.incoming = 0;
        
        //日志
        client.debug = function (e) {
            $('#second div').append($("<code>").text(e));
        };

        //定義連接成功回調函數
        var on_connect = function (x) {

            //自己訂閱自己
            //id = client.subscribe("/topic/testTopic", function (m) {
                // reply by sending the reversed text to the temp queue defined in the "reply-to" header
                //var reversedText = m.body.split("").reverse().join("");
                //client.send(m.headers['reply-to'], { "content-type": "text/plain" }, reversedText);
            //});
        };
        var on_error = function () {
            console.log('error');
        };//guest
        client.connect('zhangsan', '123', on_connect, on_error, '/');
        
        $(function () {
            $("#btnSend").click(function () {
                var text = $("#post").val();
                if (text == "") { alert("請輸入要發送的數據"); return; }
                sendData(text);
            });
        });
        //myMq  testTomp
        function sendData(text) {
            /*當沒有隊列名稱的時候,會創建一個隊列。不會重復創建
            格式:/topic/test1 指定交換機類型是 topic
            */
            client.send('test1', { "content-type": "text/plain" }, text);
            $('#post').val("");
        }
        $('#first input').focus(function () {
            if (!has_had_focus) {
                has_had_focus = true;
                $(this).val("");
            }
        });

 

 

 

 

這里用的:client.send('test1', { "content-type": "text/plain" }, text); 

Exchanges是direct ,所有只會有一個消費者

 

 

如果想改成廣播的方式。只要修改: client.send('/topic/test1', { "content-type": "text/plain" }, text);

看看效果:

 

 

 

 

接收有效的類型格式是:/temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/.

在消費端,因為也訂閱了。


        var print_first = pipe('#first', function (data) {
            //發送消息 myMq
            client.send('/topic/test1', { "content-type": "text/plain" }, data);
        });

所有可以實現消費端互相推送消息

 

 

 

 

你會發現,我們發送的"content-type": "text/plain" 是文本類,但很多時候,我們發送的時候是json

比如:

 var p = { name: "張三", addres: "深圳", age: 20 };
            var json = JSON.stringify(p);
            client.send('/topic/test1', { "content-type": "application/json" }, json);
            //client.send('/topic/test1', {}, json); //傳{} 也行

 

接收端:

 

 

 接收端獲取值可以直接反序列化獲取值

  var p = JSON.parse(d.body);
   print_first("我的名字是:"+p.name);

 

 官網的例子中還有一個功能,就是發送到畫畫,把數據傳遞給消費端。然后在消費端重現

 

 

 

最后。關於RabbitMq的應用場景。這里說得比較好:

 

 

public class RabbitMqProducerFactory
    {
        public static IConnection CreateConnection()
        {
            var factory = new ConnectionFactory()
            {
                VirtualHost = ConfigurationManager.AppSettings["MqVirtualHost"],
                HostName = ConfigurationManager.AppSettings["MqHostName"],
                UserName = ConfigurationManager.AppSettings["MqUserName"],
                Password = ConfigurationManager.AppSettings["MqPassword"],
                Port = int.Parse(ConfigurationManager.AppSettings["MqPort"].ToString()),
                RequestedHeartbeat = 60, //心跳超時時間
                AutomaticRecoveryEnabled = true //自動重連
            };

            return factory.CreateConnection(); //創建連接對象
        }
    }
-------------------------------------------------------------------------------
public class RabbitMqSendMessage
    {
        static string QueueName = ConfigurationManager.AppSettings["MqQueueName"];
        public static void SendMessage(string message)
        {
            using (IConnection conn = RabbitMqProducerFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建
                    channel.QueueDeclare(queue: QueueName,
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//消息持久化

                    channel.BasicPublish(exchange: "",
                                routingKey: QueueName,
                                basicProperties: properties,
                                body: body);
                }
            }
        }
    }
-------------------------------------------------------------------------------

 

 

 

http://blog.csdn.net/whoamiyang/article/details/54954780

https://blog.csdn.net/gb4215287/article/details/79457445

 

 http://blog.csdn.net/zyz511919766/article/details/41946521

 

 參考資料:

http://www.cnblogs.com/PatrickLiu/tag/RabbitMQ/

https://www.jianshu.com/p/4a8336bc3428

https://www.jianshu.com/p/494dc4ca6fb9

https://www.cnblogs.com/ericli-ericli/p/5902270.html
https://www.cnblogs.com/yangh965/p/5862347.html
http://blog.csdn.net/u011642663/article/details/54691788
http://www.cnblogs.com/PatrickLiu/p/6807019.html#commentform
http://www.cnblogs.com/Andon_liu/p/5401961.html
https://www.cnblogs.com/ericli-ericli/p/5917018.html
http://www.erlang.org/downloads
http://blog.csdn.net/wangqingpei557/article/details/47864761
http://blog.csdn.net/wangqingpei557/article/details/47864761

 http://www.80iter.com/blog/1437455520862503

http://blog.csdn.net/u012631731/article/category/7240883

Demo下載

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM