RabbitMQ安装与使用


一、下载RabbitMQ

http://www.rabbitmq.com/install-windows.html

 

二、下载OTP

http://www.erlang.org/downloads

 

三、安装OTP、RabbitMQ

 

注意:需要先安装OTP,再安装MQ。必须使用默认配置安装(c盘安装)

四、配置RabbitMQ

 

配置

  装好之后还是有一个配置文件需要设置一下的,位置是在%HOMEPATH%\AppData\Roaming\RabbitMQ,你会看到下面已经有一个rabbitmq.config.example文件,不过还需要新建一个rabbitmq.config文件。配置内容可参考

[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 5672}]}

]}
].

loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

 

 

安装完成后,Erlang和RabbitMQ环境变量是没有配置的,需要自己手动去配置,如下图: 

变量名分别为:ERLANG_HOME 、RABBITMQ_SERVER;同时把变量为配置在path环境变量中即可。 
然后我们启动RabbitMQ服务,如下图: 

我们看上图 completed with 0 plugins. 意思是插件为0;我们服务启动了但是访问不了http://localhost:15672/就是缺少插件。也就是界面管理工具所需的插件,找到RabbitMQ的安装目录执行第一种方式,如下命令:

 

第二种方式,如下命令:

 

其中的 plugin-name 包括下面的名字,下面每一个输入执行一次即可:

 

建议使用第一种方式执行,执行完命令如下图说明已经成功的安装插件:

找到bat的目录

执行相关命令

 

1.添加用户密码 rabbitmqctl add_user wenli wenli

2.设置wenli为管理员rabbitmqctl set_user_tags wenli administrator

3.配置权限rabbitmqctl set_permissions -p / wenli ".*" ".*" ".*"

4.启动RabbitMQ的web管理rabbitmq-plugins enable rabbitmq_management

五、配置队列

1.使用上面创建的用户登录http://127.0.0.1:15672,进入控制台

2.创建虚机
 
3.创建exchange

 

4.创建队列
 

 

5.绑定队列。点击exchange,绑定上一步创建的队列。

 

注意:Exchange的Type为Direct及Durability为Durable,Queue的RoutingKey可保持空。
 
五、使用C#操作队列
1、安装RabbitMQ Client
Install-Package RabbitMQ.Client -Version 3.6.9
2、调用代码
public class RabbitMqMessageService
    {
        public event EventHandler<string> MessageReceived;
        public event EventHandler<string> ConnectionShutdown; 
 
        private string _hostName = "localhost";
        private string _userName = "132";
        private string _password = "13";
        private string _exchangeName = "exchange";
        private string _queueName = "queue";
        private string _routingKey = "";
        private bool _persistent = false;
        private object sync_root = new object();
        private IConnection _connection;
        private bool _disposed;
 
        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }
 
        public RabbitMqMessageService(string hostName, string userName, string password, string exchangeName, string queueName, bool persistent)
        {
            _hostName = hostName;
            _userName = userName;
            _password = password;
            _exchangeName = exchangeName;
            _queueName = queueName;
            _persistent = persistent;
        }
 
        public void Connect()
        {
            if (IsConnected) return;
 
            lock (sync_root)
            {
                var connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = _hostName;
                connectionFactory.UserName = _userName;
                connectionFactory.Password = _password;
 
                _connection = connectionFactory.CreateConnection();
                _connection.ConnectionShutdown += (sender, e) => 
                {
                    var handler = ConnectionShutdown;
                    if (handler != null)
                    {
                        handler(sender, e.ReplyText);
                    }
                };
            }
        }
 
        public void Dispose()
        {
            if (_disposed || !IsConnected) return;
 
            _disposed = true;
 
            _connection.Close();
            _connection.Dispose();
        }
 
        public void Send(string message)
        {
            if (!IsConnected) throw new Exception("RabbitMq未连接");
            var model = _connection.CreateModel();
            Send(message, model);
        }
 
        public void Receive()
        {
            if (!IsConnected) throw new Exception("RabbitMq未连接");
            lock (sync_root)
            {
                var model = _connection.CreateModel();
                Receive(model);
            }
        }
 
        private void Send(string message , IModel model)
        {
            if (!IsConnected) throw new Exception("RabbitMq未连接");
            var props = model.CreateBasicProperties();
            SetBasicPropertes(props);
            byte[] messageBytes = Encoding.UTF8.GetBytes(message);
            model.BasicPublish(_exchangeName, _queueName, props, messageBytes);
        }
 
        private void Receive(IModel channel)
        {
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
 
                var handler = MessageReceived;
                if (handler != null)
                {
                    handler(ch, message);
                }
 
                channel.BasicAck(ea.DeliveryTag, false);
            };
            channel.BasicConsume(_queueName, false, consumer);
        }
 
        private void SetBasicPropertes(IBasicProperties props)
        {
            props.Persistent = _persistent;
        }
 
        //private void SetQueue(IModel model)
        //{
        //    model.ExchangeDeclare(_exchangeName, ExchangeType.Direct);
        //    model.QueueDeclare(_queueName, true, false, false, null);
        //    model.QueueBind(_queueName, _exchangeName, _routingKey, null);
        //}
    }
 

 

加入队列
var hostName = GlobalConfig.Settings["RabbitMqHost"];
var userName = GlobalConfig.Settings["RabbitMqUsername"];
var password = GlobalConfig.Settings["RabbitMqPassword"];
                
var rbtMqService = new RabbitMqMessageService(hostName, userName,password, exchangeName, queueName, persistent);
rbtMqService.Connect();
rbtMqService.Send("sljfklskdlfjlsjdfk");
 
消费队列
 var hostName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqHost");
 var userName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqUsername");
 var password = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqPassword");
 var exchangeName = "newexchange";
 var queueName = "newqueue";
 var persistent = false;
 
 var mqService = new RabbitMqMessageService(hostName, userName, password, exchangeName, queueName, persistent);
  mqService.Connect();
 mqService.ConnectionShutdown += (s, e) =>
            {
                Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "newqueue队列连接已关闭:{0} \n--------------", e);
            };
            mqService.MessageReceived += (s, msg) =>
            {
                Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "消息内容:{0} \n--------------", msg);
                if (!string.IsNullOrEmpty(msg))
                    // 业务逻辑哦
            };
            mqService.Receive();

 

 
参考文献:
https://dotnetcodr.com/messaging/
https://www.cnblogs.com/shanyou/p/4067250.html
 
 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM