一、下载RabbitMQ
http://www.rabbitmq.com/install-windows.html
二、下载OTP
http://www.erlang.org/downloads
三、安装OTP、RabbitMQ
注意:需要先安装OTP,再安装MQ。必须使用默认配置安装(c盘安装)
四、配置RabbitMQ
配置
装好之后还是有一个配置文件需要设置一下的,位置是在%HOMEPATH%\AppData\Roaming\RabbitMQ,你会看到下面已经有一个rabbitmq.config.example文件,不过还需要新建一个rabbitmq.config文件。配置内容可参考
[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 5672}]}
]}
].
loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。
tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。
安装完成后,Erlang和RabbitMQ环境变量是没有配置的,需要自己手动去配置,如下图:
变量名分别为:ERLANG_HOME 、RABBITMQ_SERVER;同时把变量为配置在path环境变量中即可。
然后我们启动RabbitMQ服务,如下图:
我们看上图 completed with 0 plugins. 意思是插件为0;我们服务启动了但是访问不了http://localhost:15672/就是缺少插件。也就是界面管理工具所需的插件,找到RabbitMQ的安装目录执行第一种方式,如下命令:
第二种方式,如下命令:
其中的 plugin-name 包括下面的名字,下面每一个输入执行一次即可:
建议使用第一种方式执行,执行完命令如下图说明已经成功的安装插件:
找到bat的目录
执行相关命令
1.添加用户密码 rabbitmqctl add_user wenli wenli
2.设置wenli为管理员rabbitmqctl set_user_tags wenli administrator
3.配置权限rabbitmqctl set_permissions -p / wenli ".*" ".*" ".*"
4.启动RabbitMQ的web管理rabbitmq-plugins enable rabbitmq_management
五、配置队列
1.使用上面创建的用户登录http://127.0.0.1:15672,进入控制台








2、调用代码
public class RabbitMqMessageService { public event EventHandler<string> MessageReceived; public event EventHandler<string> ConnectionShutdown; private string _hostName = "localhost"; private string _userName = "132"; private string _password = "13"; private string _exchangeName = "exchange"; private string _queueName = "queue"; private string _routingKey = ""; private bool _persistent = false; private object sync_root = new object(); private IConnection _connection; private bool _disposed; public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public RabbitMqMessageService(string hostName, string userName, string password, string exchangeName, string queueName, bool persistent) { _hostName = hostName; _userName = userName; _password = password; _exchangeName = exchangeName; _queueName = queueName; _persistent = persistent; } public void Connect() { if (IsConnected) return; lock (sync_root) { var connectionFactory = new ConnectionFactory(); connectionFactory.HostName = _hostName; connectionFactory.UserName = _userName; connectionFactory.Password = _password; _connection = connectionFactory.CreateConnection(); _connection.ConnectionShutdown += (sender, e) => { var handler = ConnectionShutdown; if (handler != null) { handler(sender, e.ReplyText); } }; } } public void Dispose() { if (_disposed || !IsConnected) return; _disposed = true; _connection.Close(); _connection.Dispose(); } public void Send(string message) { if (!IsConnected) throw new Exception("RabbitMq未连接"); var model = _connection.CreateModel(); Send(message, model); } public void Receive() { if (!IsConnected) throw new Exception("RabbitMq未连接"); lock (sync_root) { var model = _connection.CreateModel(); Receive(model); } } private void Send(string message , IModel model) { if (!IsConnected) throw new Exception("RabbitMq未连接"); var props = model.CreateBasicProperties(); SetBasicPropertes(props); byte[] messageBytes = Encoding.UTF8.GetBytes(message); model.BasicPublish(_exchangeName, _queueName, props, messageBytes); } private void Receive(IModel channel) { var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var handler = MessageReceived; if (handler != null) { handler(ch, message); } channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(_queueName, false, consumer); } private void SetBasicPropertes(IBasicProperties props) { props.Persistent = _persistent; } //private void SetQueue(IModel model) //{ // model.ExchangeDeclare(_exchangeName, ExchangeType.Direct); // model.QueueDeclare(_queueName, true, false, false, null); // model.QueueBind(_queueName, _exchangeName, _routingKey, null); //} }
var hostName = GlobalConfig.Settings["RabbitMqHost"]; var userName = GlobalConfig.Settings["RabbitMqUsername"]; var password = GlobalConfig.Settings["RabbitMqPassword"]; var rbtMqService = new RabbitMqMessageService(hostName, userName,password, exchangeName, queueName, persistent); rbtMqService.Connect(); rbtMqService.Send("sljfklskdlfjlsjdfk");
var hostName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqHost"); var userName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqUsername"); var password = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqPassword"); var exchangeName = "newexchange"; var queueName = "newqueue"; var persistent = false; var mqService = new RabbitMqMessageService(hostName, userName, password, exchangeName, queueName, persistent); mqService.Connect(); mqService.ConnectionShutdown += (s, e) => { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "newqueue队列连接已关闭:{0} \n--------------", e); }; mqService.MessageReceived += (s, msg) => { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "消息内容:{0} \n--------------", msg); if (!string.IsNullOrEmpty(msg)) // 业务逻辑哦 }; mqService.Receive();