因為項目中需要用到RabbitMq,所有花時間研究了下,雖然博客園已經有前輩寫了關於RabbitMq的文章。但還是有必要研究下!
什么是RabbitMq?
百度解釋:MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等。https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin
安裝RabbitMq
RabbitMq 官網:http://www.rabbitmq.com/
ErLang:http://www.erlang.org/download.html
RabbitMQ:http://www.rabbitmq.com/download.html
安裝后,開啟Rabbit MQ管理后台
打開RabbitMQ Server的開始菜單安裝目錄
選擇RabbitMQ Command Prompt 命令行並打開,輸入
rabbitmq-plugins enable rabbitmq_management
然后重啟服務:
然后在瀏覽器輸入:http://localhost:15672
默認用戶名和密碼:guest
Overview:概述,
Connection:連接
Channels:消息通道,在客戶端的每個連接里,可建立多個channel.
Exchanges:消息交換機,它指定消息按什么規則,路由到哪個隊列
Queues:隊列,消息的載體,每個消息都會被投到一個或多個隊列。
Admin:管理
幾個概念說明:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。
Producer:消息生產者,就是投遞消息的程序.
Consumer:消息消費者,就是接受消息的程序.
現在開始編寫發送消息代碼:
ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.0.39"; //factory.Port = 5672; factory.UserName = "zhangsan"; factory.Password = "123"; factory.AutomaticRecoveryEnabled = true;//自動連接 //factory.VirtualHost = "test2"; //factory.RequestedHeartbeat = 60; //心跳超時時間 using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) //創建一個通道 { //在MQ上定義一個持久化隊列,該隊列不存在時,才會創建,聲明隊列是一次性的 //channel.QueueDeclare("myMq", true, false, false, null); var qu = channel.QueueDeclare(queue: "myMq", //隊列名稱 durable: true, //是否是持久化 exclusive: false, autoDelete: false, arguments: null); //參數
uint count = qu.MessageCount; //獲取消息隊列數量
string name = qu.QueueName; //消息隊列名稱
//設置消息持久化
IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //2 消息持久化 1 非持久化
//properties.SetPersistent(true); //跟上面同理
var post = new { name = "lishi", age = 201 };
string json = JsonConvert.SerializeObject(post); byte[] bytes = Encoding.UTF8.GetBytes(json); //發布消息
channel.BasicPublish("", "myMq", properties, bytes); Console.WriteLine("消息發送成功" + json); } }
訂閱消息者,或者加接收,消費者
ConnectionFactory factory = new ConnectionFactory { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" }; using (IConnection conn = factory.CreateConnection()) { using (IModel model = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 var qu = model.QueueDeclare("myMq", true, false, false, null);
uint count = qu.MessageCount; //獲取消息隊列數量
string name = qu.QueueName; //消息隊列名稱 //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 model.BasicQos(0, 1, false); Console.WriteLine("Listering....."); //在隊列上定義一個消費者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(model); //消費隊列,並設置應答模式為程序主動應答,意思是等我回復你。你再刪除 // //noAck設置false,告訴broker,發送消息之后,消息暫時不要刪除,等消費者處理完成再說 model.BasicConsume("myMq", false, consumer); while (true) { //阻塞函數,獲取隊列中的消息 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; string str = Encoding.UTF8.GetString(bytes); var json = JsonConvert.DeserializeObject<dynamic>(str); Console.WriteLine("我叫" + json.name + "我今年" + json.age); //回復確認,告訴服務器,我處理完了。你可以刪除了 model.BasicAck(ea.DeliveryTag, false); } } }
在運行程序之前。我們看看。隊列是沒有的
當運行發送消息客戶端
此時,隊列已經存在,消息已經存在隊列里面了
此時還沒有消費者連接來消費
我們可以先手動去看看消息。單擊隊列名稱。到隊列詳細界面,可以獲取信息。
當然。你手動消費了。自然就沒有了
現在運行消息客戶端,2個
管理界面顯示:
通道界面:
然后發送消息:
額。奇怪。怎么只有一個消費者受到消息呢?這里默認是Direct類型
這是由Exchange 類型 類型決定的
1.Direct exchange
Direct exchange 完全根據 key 進行投遞,只有 key 與綁定時的 routing key 完全一致的消息才會收到消息,參考官網提供的圖 4 更直觀地了解 Direct exchange。
2.Fanount exchange
Fanount 完全不關心 key,直接采取廣播的方式進行消息投遞,與該交換機綁定的所有隊列都會收到消息
.Topic exchange
Topic exchange 會根據 key 進行模式匹配然后進行投遞,與設置的 routing key 匹配上的隊列才能收到消息。
4.Headers exchange
Header exchange 使用消息頭代替 routing key 作為關鍵字進行路由,不過在實際應用過程中這種類型的 exchange 使用較少。
用fanout方式發布消息:
ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.0.39"; //factory.Port = 5672; factory.UserName = "zhangsan"; factory.Password = "123"; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) //創建一個通道 { /* * 交換機類型 fanout(扇出):傳播它收到的所有消息去它知道所有的隊列中,會給所有在線的消費者發送所有信息,可以忽略:routingKey值。即隊列名稱 *direct(直接):把消息發送到指定的交換機下的指定的隊列中 */ //channel.ExchangeDelete("message",true); //不能改變已經存在交換機的類型 比如: channel.ExchangeDeclare("message", "direct") channel.ExchangeDeclare("message", "fanout"); //type要小寫 //在MQ上定義一個持久化隊列,該隊列不存在時,才會創建,聲明隊列是一次性的,這里不需要申明隊列了。因為定義了交換機 //channel.QueueDeclare(queue: "myMq", //隊列名稱 // durable: true, //是否是持久化 // exclusive: false, //是否排除其他,即存在則不會在創建 // autoDelete: false, //是否自動刪除 // arguments: null); //參數 //設置消息持久化 //IBasicProperties properties = channel.CreateBasicProperties(); //properties.DeliveryMode = 2; //2 消息持久化 1 非持久化 //properties.SetPersistent(true); //跟上面同理 var post = new { name = "交換機", age = 20 }; string json = JsonConvert.SerializeObject(post); byte[] bytes = Encoding.UTF8.GetBytes(json); //發布消息 channel.BasicPublish("message", "", null, bytes); Console.WriteLine("消息發送成功" + json); } }
接收代碼:
ConnectionFactory factory = new ConnectionFactory { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest", //VirtualHost="test2" }; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { var name = channel.QueueDeclare().QueueName; //如果該交換機不存在。則會報錯 channel.QueueBind(name, "message", ""); //把交換機和隊列綁定,沒有指定隊列名稱,所以是所有隊列 //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 //channel.QueueDeclare(name, true, false, false, null); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); Console.WriteLine("Listering....."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(name, false, consumer); consumer.Received += (m, ea) => { var body = ea.Body; string str = Encoding.UTF8.GetString(body); var json = JsonConvert.DeserializeObject<dynamic>(str); Console.WriteLine("我叫" + json.name + "我今年" + json.age); //處理完成,告訴Broker可以服務端可以刪除消息,分配新的消息過來 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.Read(); } }
運行結果
接收消息一端是需要一直運行的接收消息的,不建議用
while(true){} 一個死循環建議用一個多線程
Task.Factory.StartNew(Method);
不過用了多線程,那么,就不能用using語句
比如:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
//VirtualHost="test2"
};
IConnection conn = factory.CreateConnection();
IModel channel = conn.CreateModel();
VirtualHost虛擬機
接收消息和發送消息,必須在同一個虛擬機。才能收到消息。默認虛擬機是 "/" 可以通過factory.VirtualHost = "test2"; 設置
當然。以上代碼是可以封裝的。比如:
public class RabbitMqConsumerFactory { public static IConnection CreateConnection() { var factory = new ConnectionFactory() { HostName = ConfigurationManager.AppSettings["MqHostName"], UserName = ConfigurationManager.AppSettings["MqUserName"], Password = ConfigurationManager.AppSettings["MqPassword"], Port = int.Parse(ConfigurationManager.AppSettings["MqPort"].ToString()), RequestedHeartbeat = 60, //心跳超時時間 AutomaticRecoveryEnabled = true //自動重連 }; return factory.CreateConnection(); //創建連接對象 } }
調用: using (IConnection conn = RabbitMqConsumerFactory.CreateConnection()){}
這僅僅是在控制台應用程序中使用,那么你 有沒有想過在Web中使用,
借助於 RabbitMQ 的 Web STOMP 插件,實現瀏覽器與服務端的全雙工通信。
從本質上說,RabbitMQ 的 Web STOMP 插件也是利用 WebSocket 對 STOMP 協議進行了一次橋接,從而實現瀏覽器與服務端的雙向通信。官網有個列子
例子:http://www.rabbitmq.com/web-stomp.html
例子下載:https://github.com/rabbitmq/rabbitmq-web-stomp-examples
我在官網的列子上研究了下
先啟用STOMP插件 rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
重啟服務:service rabbitmq-server restart
開通后的插件,可以在web頁面中overview中查看
下載官網的列子后,里面有個stomp.js
以下是主要js代碼
接收消息
<script> //畫圖 start var draw; send = draw = function () { }; var lines = []; var canvas = document.getElementById('cnvs'); if (canvas.getContext) { var ctx = canvas.getContext('2d'); var img = new Image(); img.onload = function () { ctx.drawImage(img, 230, 160); }; //bunny img.src = 'bunny.png'; draw = function (p) { ctx.beginPath(); ctx.moveTo(p.x1, p.y1); ctx.lineTo(p.x2, p.y2); ctx.stroke(); ctx.drawImage(img, 230, 160); }; var do_resize = function () { canvas.width = window.innerWidth; canvas.height = window.innerHeight; ctx.font = "bold 20px sans-serif"; ctx.fillStyle = "#444"; ctx.fillText("Draw wings on the bunny!", 260, 100); ctx.font = "normal 16px sans-serif"; ctx.fillStyle = "#888"; ctx.fillText("(For more fun open a second browser)", 255, 130); ctx.drawImage(img, 230, 160); ctx.strokeStyle = "#fa0"; ctx.lineWidth = "10"; ctx.lineCap = "round"; $.map(lines, function (p) { draw(p); }); }; $(window).resize(do_resize); $(do_resize); var pos = $('#cnvs').position(); var prev = null; $('#cnvs').mousedown(function (evt) { evt.preventDefault(); evt.stopPropagation(); $('#cnvs').bind('mousemove', function (e) { var curr = { x: e.pageX - pos.left, y: e.pageY - pos.top }; if (!prev) { prev = curr; return; } if (Math.sqrt(Math.pow(prev.x - curr.x, 2) + Math.pow(prev.y - curr.y, 2)) > 8) { var p = { x1: prev.x, y1: prev.y, x2: curr.x, y2: curr.y } lines.push(p); draw(p); send(JSON.stringify(p)); prev = curr; } }); }); $('html').mouseup(function () { prev = null; $('#cnvs').unbind('mousemove'); }); } else { document.write("Sorry - this demo requires a browser with canvas tag support."); } //畫圖 end var has_had_focus = false; var pipe = function (el_name, send) { var div = $(el_name + ' div'); var inp = $(el_name + ' input'); var form = $(el_name + ' form'); var print = function (m, p) { p = (p === undefined) ? '' : JSON.stringify(p); div.append($("<code>").text(m + ' ' + p)); div.scrollTop(div.scrollTop() + 10000); }; if (send) { form.submit(function () { send(inp.val()); inp.val(''); return false; }); } return print; }; // Stomp.js boilerplate var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws'); client.debug = pipe('#second'); //顯示log var print_first = pipe('#first', function (data) { //發送消息 myMq client.send('test1', { "content-type": "text/plain" }, data); //client.send('message', { "content-type": "text/plain" }, data); //client.send('/topic/test', { "content-type": "text/plain" }, data); }); //建立連接后,訂閱消息 var on_connect = function (x) { //獲取消息(訂閱)testTomp /* 訂閱改隊列的消息,如果沒有該隊列。會創建一個/topic/ */ id = client.subscribe("/topic/test1", function (d) { //d.body 是接收到的消息 print_first(d.body); //輸出到界面 var p = JSON.parse(d.body); lines.push(p); draw(p, true); }); }; var on_error = function () { console.log('error'); }; // "/"代表 VirtualHost client.connect('zhangsan', '123', on_connect, on_error, '/'); client.onreceive = function (m) { alert(m.body); //$('#first div').append($("<code>").text(m.body)); } $('#first input').focus(function () { if (!has_had_focus) { has_had_focus = true; $(this).val(""); } }); </script>
發送消息:
var ws = new WebSocket('ws://' + window.location.hostname + ':15674/ws'); //建立連接 var client = Stomp.over(ws); //client.heartbeat.outgoing = 0; //client.heartbeat.incoming = 0; //日志 client.debug = function (e) { $('#second div').append($("<code>").text(e)); }; //定義連接成功回調函數 var on_connect = function (x) { //自己訂閱自己 //id = client.subscribe("/topic/testTopic", function (m) { // reply by sending the reversed text to the temp queue defined in the "reply-to" header //var reversedText = m.body.split("").reverse().join(""); //client.send(m.headers['reply-to'], { "content-type": "text/plain" }, reversedText); //}); }; var on_error = function () { console.log('error'); };//guest client.connect('zhangsan', '123', on_connect, on_error, '/'); $(function () { $("#btnSend").click(function () { var text = $("#post").val(); if (text == "") { alert("請輸入要發送的數據"); return; } sendData(text); }); }); //myMq testTomp function sendData(text) { /*當沒有隊列名稱的時候,會創建一個隊列。不會重復創建 格式:/topic/test1 指定交換機類型是 topic */ client.send('test1', { "content-type": "text/plain" }, text); $('#post').val(""); } $('#first input').focus(function () { if (!has_had_focus) { has_had_focus = true; $(this).val(""); } });
這里用的:client.send('test1', { "content-type": "text/plain" }, text);
Exchanges是direct ,所有只會有一個消費者
如果想改成廣播的方式。只要修改: client.send('/topic/test1', { "content-type": "text/plain" }, text);
看看效果:
接收有效的類型格式是:/temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/.
在消費端,因為也訂閱了。
var print_first = pipe('#first', function (data) {
//發送消息 myMq
client.send('/topic/test1', { "content-type": "text/plain" }, data);
});
所有可以實現消費端互相推送消息
你會發現,我們發送的"content-type": "text/plain" 是文本類,但很多時候,我們發送的時候是json
比如:
var p = { name: "張三", addres: "深圳", age: 20 };
var json = JSON.stringify(p);
client.send('/topic/test1', { "content-type": "application/json" }, json);
//client.send('/topic/test1', {}, json); //傳{} 也行
接收端:
接收端獲取值可以直接反序列化獲取值
var p = JSON.parse(d.body);
print_first("我的名字是:"+p.name);
官網的例子中還有一個功能,就是發送到畫畫,把數據傳遞給消費端。然后在消費端重現
最后。關於RabbitMq的應用場景。這里說得比較好:
public class RabbitMqProducerFactory { public static IConnection CreateConnection() { var factory = new ConnectionFactory() { VirtualHost = ConfigurationManager.AppSettings["MqVirtualHost"], HostName = ConfigurationManager.AppSettings["MqHostName"], UserName = ConfigurationManager.AppSettings["MqUserName"], Password = ConfigurationManager.AppSettings["MqPassword"], Port = int.Parse(ConfigurationManager.AppSettings["MqPort"].ToString()), RequestedHeartbeat = 60, //心跳超時時間 AutomaticRecoveryEnabled = true //自動重連 }; return factory.CreateConnection(); //創建連接對象 } } ------------------------------------------------------------------------------- public class RabbitMqSendMessage { static string QueueName = ConfigurationManager.AppSettings["MqQueueName"]; public static void SendMessage(string message) { using (IConnection conn = RabbitMqProducerFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//消息持久化 channel.BasicPublish(exchange: "", routingKey: QueueName, basicProperties: properties, body: body); } } } } -------------------------------------------------------------------------------
http://blog.csdn.net/whoamiyang/article/details/54954780
https://blog.csdn.net/gb4215287/article/details/79457445
http://blog.csdn.net/zyz511919766/article/details/41946521
參考資料:
http://www.cnblogs.com/PatrickLiu/tag/RabbitMQ/
https://www.jianshu.com/p/4a8336bc3428
https://www.jianshu.com/p/494dc4ca6fb9
https://www.cnblogs.com/ericli-ericli/p/5902270.html
https://www.cnblogs.com/yangh965/p/5862347.html
http://blog.csdn.net/u011642663/article/details/54691788
http://www.cnblogs.com/PatrickLiu/p/6807019.html#commentform
http://www.cnblogs.com/Andon_liu/p/5401961.html
https://www.cnblogs.com/ericli-ericli/p/5917018.html
http://www.erlang.org/downloads
http://blog.csdn.net/wangqingpei557/article/details/47864761
http://blog.csdn.net/wangqingpei557/article/details/47864761
http://www.80iter.com/blog/1437455520862503
http://blog.csdn.net/u012631731/article/category/7240883