一、下載RabbitMQ
http://www.rabbitmq.com/install-windows.html
二、下載OTP
http://www.erlang.org/downloads
三、安裝OTP、RabbitMQ
注意:需要先安裝OTP,再安裝MQ。必須使用默認配置安裝(c盤安裝)
四、配置RabbitMQ
配置
裝好之后還是有一個配置文件需要設置一下的,位置是在%HOMEPATH%\AppData\Roaming\RabbitMQ,你會看到下面已經有一個rabbitmq.config.example文件,不過還需要新建一個rabbitmq.config文件。配置內容可參考
[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 5672}]}
]}
].
loopback_users:設置只能在與RabbitMq服務同一台機器上訪問服務的用戶。
tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。
安裝完成后,Erlang和RabbitMQ環境變量是沒有配置的,需要自己手動去配置,如下圖:
變量名分別為:ERLANG_HOME 、RABBITMQ_SERVER;同時把變量為配置在path環境變量中即可。
然后我們啟動RabbitMQ服務,如下圖:
我們看上圖 completed with 0 plugins. 意思是插件為0;我們服務啟動了但是訪問不了http://localhost:15672/就是缺少插件。也就是界面管理工具所需的插件,找到RabbitMQ的安裝目錄執行第一種方式,如下命令:
第二種方式,如下命令:
其中的 plugin-name 包括下面的名字,下面每一個輸入執行一次即可:
建議使用第一種方式執行,執行完命令如下圖說明已經成功的安裝插件:
找到bat的目錄
執行相關命令
1.添加用戶密碼 rabbitmqctl add_user wenli wenli
2.設置wenli為管理員rabbitmqctl set_user_tags wenli administrator
3.配置權限rabbitmqctl set_permissions -p / wenli ".*" ".*" ".*"
4.啟動RabbitMQ的web管理rabbitmq-plugins enable rabbitmq_management
五、配置隊列
1.使用上面創建的用戶登錄http://127.0.0.1:15672,進入控制台








2、調用代碼
public class RabbitMqMessageService { public event EventHandler<string> MessageReceived; public event EventHandler<string> ConnectionShutdown; private string _hostName = "localhost"; private string _userName = "132"; private string _password = "13"; private string _exchangeName = "exchange"; private string _queueName = "queue"; private string _routingKey = ""; private bool _persistent = false; private object sync_root = new object(); private IConnection _connection; private bool _disposed; public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public RabbitMqMessageService(string hostName, string userName, string password, string exchangeName, string queueName, bool persistent) { _hostName = hostName; _userName = userName; _password = password; _exchangeName = exchangeName; _queueName = queueName; _persistent = persistent; } public void Connect() { if (IsConnected) return; lock (sync_root) { var connectionFactory = new ConnectionFactory(); connectionFactory.HostName = _hostName; connectionFactory.UserName = _userName; connectionFactory.Password = _password; _connection = connectionFactory.CreateConnection(); _connection.ConnectionShutdown += (sender, e) => { var handler = ConnectionShutdown; if (handler != null) { handler(sender, e.ReplyText); } }; } } public void Dispose() { if (_disposed || !IsConnected) return; _disposed = true; _connection.Close(); _connection.Dispose(); } public void Send(string message) { if (!IsConnected) throw new Exception("RabbitMq未連接"); var model = _connection.CreateModel(); Send(message, model); } public void Receive() { if (!IsConnected) throw new Exception("RabbitMq未連接"); lock (sync_root) { var model = _connection.CreateModel(); Receive(model); } } private void Send(string message , IModel model) { if (!IsConnected) throw new Exception("RabbitMq未連接"); var props = model.CreateBasicProperties(); SetBasicPropertes(props); byte[] messageBytes = Encoding.UTF8.GetBytes(message); model.BasicPublish(_exchangeName, _queueName, props, messageBytes); } private void Receive(IModel channel) { var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var handler = MessageReceived; if (handler != null) { handler(ch, message); } channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(_queueName, false, consumer); } private void SetBasicPropertes(IBasicProperties props) { props.Persistent = _persistent; } //private void SetQueue(IModel model) //{ // model.ExchangeDeclare(_exchangeName, ExchangeType.Direct); // model.QueueDeclare(_queueName, true, false, false, null); // model.QueueBind(_queueName, _exchangeName, _routingKey, null); //} }
var hostName = GlobalConfig.Settings["RabbitMqHost"]; var userName = GlobalConfig.Settings["RabbitMqUsername"]; var password = GlobalConfig.Settings["RabbitMqPassword"]; var rbtMqService = new RabbitMqMessageService(hostName, userName,password, exchangeName, queueName, persistent); rbtMqService.Connect(); rbtMqService.Send("sljfklskdlfjlsjdfk");
var hostName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqHost"); var userName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqUsername"); var password = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqPassword"); var exchangeName = "newexchange"; var queueName = "newqueue"; var persistent = false; var mqService = new RabbitMqMessageService(hostName, userName, password, exchangeName, queueName, persistent); mqService.Connect(); mqService.ConnectionShutdown += (s, e) => { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "newqueue隊列連接已關閉:{0} \n--------------", e); }; mqService.MessageReceived += (s, msg) => { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "消息內容:{0} \n--------------", msg); if (!string.IsNullOrEmpty(msg)) // 業務邏輯哦 }; mqService.Receive();