RabbitMQ安裝與使用


一、下載RabbitMQ

http://www.rabbitmq.com/install-windows.html

 

二、下載OTP

http://www.erlang.org/downloads

 

三、安裝OTP、RabbitMQ

 

注意:需要先安裝OTP,再安裝MQ。必須使用默認配置安裝(c盤安裝)

四、配置RabbitMQ

 

配置

  裝好之后還是有一個配置文件需要設置一下的,位置是在%HOMEPATH%\AppData\Roaming\RabbitMQ,你會看到下面已經有一個rabbitmq.config.example文件,不過還需要新建一個rabbitmq.config文件。配置內容可參考

[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 5672}]}

]}
].

loopback_users:設置只能在與RabbitMq服務同一台機器上訪問服務的用戶。

tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。

 

 

安裝完成后,Erlang和RabbitMQ環境變量是沒有配置的,需要自己手動去配置,如下圖: 

變量名分別為:ERLANG_HOME 、RABBITMQ_SERVER;同時把變量為配置在path環境變量中即可。 
然后我們啟動RabbitMQ服務,如下圖: 

我們看上圖 completed with 0 plugins. 意思是插件為0;我們服務啟動了但是訪問不了http://localhost:15672/就是缺少插件。也就是界面管理工具所需的插件,找到RabbitMQ的安裝目錄執行第一種方式,如下命令:

 

第二種方式,如下命令:

 

其中的 plugin-name 包括下面的名字,下面每一個輸入執行一次即可:

 

建議使用第一種方式執行,執行完命令如下圖說明已經成功的安裝插件:

找到bat的目錄

執行相關命令

 

1.添加用戶密碼 rabbitmqctl add_user wenli wenli

2.設置wenli為管理員rabbitmqctl set_user_tags wenli administrator

3.配置權限rabbitmqctl set_permissions -p / wenli ".*" ".*" ".*"

4.啟動RabbitMQ的web管理rabbitmq-plugins enable rabbitmq_management

五、配置隊列

1.使用上面創建的用戶登錄http://127.0.0.1:15672,進入控制台

2.創建虛機
 
3.創建exchange

 

4.創建隊列
 

 

5.綁定隊列。點擊exchange,綁定上一步創建的隊列。

 

注意:Exchange的Type為Direct及Durability為Durable,Queue的RoutingKey可保持空。
 
五、使用C#操作隊列
1、安裝RabbitMQ Client
Install-Package RabbitMQ.Client -Version 3.6.9
2、調用代碼
public class RabbitMqMessageService
    {
        public event EventHandler<string> MessageReceived;
        public event EventHandler<string> ConnectionShutdown; 
 
        private string _hostName = "localhost";
        private string _userName = "132";
        private string _password = "13";
        private string _exchangeName = "exchange";
        private string _queueName = "queue";
        private string _routingKey = "";
        private bool _persistent = false;
        private object sync_root = new object();
        private IConnection _connection;
        private bool _disposed;
 
        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }
 
        public RabbitMqMessageService(string hostName, string userName, string password, string exchangeName, string queueName, bool persistent)
        {
            _hostName = hostName;
            _userName = userName;
            _password = password;
            _exchangeName = exchangeName;
            _queueName = queueName;
            _persistent = persistent;
        }
 
        public void Connect()
        {
            if (IsConnected) return;
 
            lock (sync_root)
            {
                var connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = _hostName;
                connectionFactory.UserName = _userName;
                connectionFactory.Password = _password;
 
                _connection = connectionFactory.CreateConnection();
                _connection.ConnectionShutdown += (sender, e) => 
                {
                    var handler = ConnectionShutdown;
                    if (handler != null)
                    {
                        handler(sender, e.ReplyText);
                    }
                };
            }
        }
 
        public void Dispose()
        {
            if (_disposed || !IsConnected) return;
 
            _disposed = true;
 
            _connection.Close();
            _connection.Dispose();
        }
 
        public void Send(string message)
        {
            if (!IsConnected) throw new Exception("RabbitMq未連接");
            var model = _connection.CreateModel();
            Send(message, model);
        }
 
        public void Receive()
        {
            if (!IsConnected) throw new Exception("RabbitMq未連接");
            lock (sync_root)
            {
                var model = _connection.CreateModel();
                Receive(model);
            }
        }
 
        private void Send(string message , IModel model)
        {
            if (!IsConnected) throw new Exception("RabbitMq未連接");
            var props = model.CreateBasicProperties();
            SetBasicPropertes(props);
            byte[] messageBytes = Encoding.UTF8.GetBytes(message);
            model.BasicPublish(_exchangeName, _queueName, props, messageBytes);
        }
 
        private void Receive(IModel channel)
        {
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
 
                var handler = MessageReceived;
                if (handler != null)
                {
                    handler(ch, message);
                }
 
                channel.BasicAck(ea.DeliveryTag, false);
            };
            channel.BasicConsume(_queueName, false, consumer);
        }
 
        private void SetBasicPropertes(IBasicProperties props)
        {
            props.Persistent = _persistent;
        }
 
        //private void SetQueue(IModel model)
        //{
        //    model.ExchangeDeclare(_exchangeName, ExchangeType.Direct);
        //    model.QueueDeclare(_queueName, true, false, false, null);
        //    model.QueueBind(_queueName, _exchangeName, _routingKey, null);
        //}
    }
 

 

加入隊列
var hostName = GlobalConfig.Settings["RabbitMqHost"];
var userName = GlobalConfig.Settings["RabbitMqUsername"];
var password = GlobalConfig.Settings["RabbitMqPassword"];
                
var rbtMqService = new RabbitMqMessageService(hostName, userName,password, exchangeName, queueName, persistent);
rbtMqService.Connect();
rbtMqService.Send("sljfklskdlfjlsjdfk");
 
消費隊列
 var hostName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqHost");
 var userName = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqUsername");
 var password = ConfigurationHelper.GetAppSettingOrDefault("RabbitMqPassword");
 var exchangeName = "newexchange";
 var queueName = "newqueue";
 var persistent = false;
 
 var mqService = new RabbitMqMessageService(hostName, userName, password, exchangeName, queueName, persistent);
  mqService.Connect();
 mqService.ConnectionShutdown += (s, e) =>
            {
                Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "newqueue隊列連接已關閉:{0} \n--------------", e);
            };
            mqService.MessageReceived += (s, msg) =>
            {
                Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "消息內容:{0} \n--------------", msg);
                if (!string.IsNullOrEmpty(msg))
                    // 業務邏輯哦
            };
            mqService.Receive();

 

 
參考文獻:
https://dotnetcodr.com/messaging/
https://www.cnblogs.com/shanyou/p/4067250.html
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM