rabbit的簡單搭建,java使用rabbitmq queue的簡單例子和一些坑


一 整合

由於本人的碼雲太多太亂了,於是決定一個一個的整合到一個springboot項目里面。

附上自己的項目地址https://github.com/247292980/spring-boot

 

以整合功能

spring-boot,FusionChart,thymeleaf,vue,ShardingJdbc,mybatis-generator,微信分享授權,drools,spring-security,spring-jpa,webjars,Aspect,drools-drt

 

這次就來整合下簡單的rabbitmq

 

二 安裝

這玩意最坑的地方在於,百度第一的安裝教程,缺了一部分。

 

1.安裝erlong,搭建環境變量
2.安裝mq
3.這里他有一個mq管理插件的東西,我只想說這玩意很老了,新的mq基本都自己裝了, http://localhost:15672 你能打開就是已經裝成功,所以xxx plugin unchange的其實是沒有錯的。


或許有人用老板的我也貼一下相應代碼,以下所有命令默認cd到mq的sbin目錄下

enable rabbitmq_management


4.查詢用戶

rabbitmqctl.bat list_users

 

5.新增一個用戶

rabbitmqctl.bat add_user username password
例子 rabbitmqctl.bat add_user haha
123456798

6.更改角色

rabbitmqctl.bat set_user_tags username role

例子
rabbitmqctl.bat set_user_tags haha administrator

7.改密碼

rabbitmqctl change_password userName newPassword

例子
rabbitmqctl change_password haha 123456

8.刪除用戶

rabbitmqctl.bat delete_user username

例子
rabbitmqctl.bat delete_user haha

9.設置用戶權限

rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

例子 這個例子一定要跑一下,百度第一的照着做會報你沒有權限的error,授予用戶username在服務器根目錄轄所有資源的讀寫權限 rabbitmqctl.bat set_permissions -p "/" haha ".*" ".*" ".*"

10.查看(指定hostpath)所有用戶的權限信息

rabbitmqctl  list_permissions  [-p  VHostPath]

11.查看指定用戶的權限信息

rabbitmqctl  list_user_permissions  User

例子
rabbitmqctl  list_user_permissions  haha

12.清除用戶的權限信息

rabbitmqctl  clear_permissions  [-p VHostPath]  User

例子
rabbitmqctl  clear_permissions  [-p VHostPath]  haha

 

13.在寫demo之前,我們還要做一個事情,添加一個隊列,進入http://localhost:15672點擊queues標簽即能看到

 

三 代碼

生產者

/**
 * @AUTHOR lgp
 * @DATE 2018/9/7 17:27
 * @DESCRIPTION rabbitmq的消息生產者
 **/
public class Producer {
    public final static String QUEUE_NAME = "rabbitMQ.lgp";
    public static final Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ相關信息
        factory.setHost("localhost");
        factory.setUsername("haha");
        factory.setPassword("123456789");
        factory.setPort(5672);
        //創建一個新的連接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //創建一個通道
            //  聲明一個隊列
//            public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException
            boolean durable = true;
            boolean exclusive = false;
            boolean auto_delete = false;
//            Map<String, Object> arguments = new HashMap<String, Object>();
//            // 統一設置隊列中的所有消息的過期時間
//            arguments.put("x-message-ttl", 30000);
//            // 設置超過多少毫秒沒有消費者來訪問隊列,就刪除隊列的時間
//            arguments.put("x-expires", 20000);
//            // 設置隊列的最新的N條消息,如果超過N條,前面的消息將從隊列中移除掉
//            arguments.put("x-max-length", 4);
//            // 設置隊列的內容的最大空間,超過該閾值就刪除之前的消息
//            arguments.put("x-max-length-bytes", 1024);
//            // 將刪除的消息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設置
//            arguments.put("x-dead-letter-exchange", "exchange.dead");
//            // 將刪除的消息推送到指定的交換機對應的路由鍵
//            arguments.put("x-dead-letter-routing-key", "routingkey.dead");
//            // 設置消息的優先級,優先級大的優先被消費
//            arguments.put("x-max-priority", 10);
            channel.queueDeclare(QUEUE_NAME, durable, exclusive, auto_delete, null);
            String message = "Hello RabbitMQ";
            //發送消息到隊列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            log.info("Producer Send message={}", message);
            //關閉通道和連接
            channel.close();
        }

    }
}

消費者

/**
 * @AUTHOR lgp
 * @DATE 2018/9/7 17:34
 * @DESCRIPTION 消費者
 **/
public class Customer {
    private final static String QUEUE_NAME = "rabbitMQ.lgp";
    public static final Logger log = LoggerFactory.getLogger(Customer.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置RabbitMQ地址
        factory.setHost("localhost");
        factory.setUsername("haha");
        factory.setPassword("123456789");
        factory.setPort(5672);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            //創建一個通道
            //聲明要關注的隊列
            //隊列持久化,在RabbitMQ重啟保證隊列不會丟失
            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Customer Waiting Received messages");
            //DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
            // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    log.info("Customer Received message={}", message);
                }
            };
            //自動回復隊列應答 -- RabbitMQ中的消息確認機制
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
}

四 注意點

1.我為了測試queueDeclare的參數代表的是什么意思,起了什么作用(由於多次測試,也出現了下面這些奇奇怪怪的bug),發現exclusive 和auto_delete的報錯都是說auto_delete,但是具體的作用就是不同,所以你懂得...

2.還有一個謎一樣的bug,我能上http://localhost:15672,但是java代碼死活不行,那我看看cmd正不正常,然后

Error: unable to perform an operation on node 'rabbit@haha'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@haha

DIAGNOSTICS
===========

attempted to contact: [rabbit@haha]

rabbit@haha:
  * connected to epmd (port 4369) on haha
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Hostname mismatch: node "rabbit@xxxx" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@xxxx"


Current node details:
 * node name: rabbitmqcli30@haha
 * effective user's home directory: C:\Users\xxxx
 * Erlang cookie hash: 0rVZh7WgaAP8H8hmZPiGLA==

就是說,node沒創建?我都用haha登上你的web管理系統啊???

這報錯莫名其妙的出現了我電腦的名字,,,,

看情況就是node rabbit@xxxx這玩意是mq底層取參數,取到了電腦的名字去了?然后兩個mabbit都啟動erlang,erlang搶奪端口的error??

 

解決方法 刪除所有mq線程,重啟mq

3.同樣是一個謎一樣的bug,啟動rabbit成功,但是java還是不同跑

Error: unable to perform an operation on node 'rabbit@haha'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@haha

DIAGNOSTICS
===========

attempted to contact: [rabbit@haha]

rabbit@haha:
  * connected to epmd (port 4369) on haha
  * epmd reports: node 'rabbit' not running at all
                  no other nodes on haha
  * suggestion: start the node

Current node details:
 * node name: rabbitmqcli26@haha
 * effective user's home directory: C:\Users\xxxx
 * Erlang cookie hash: 0rVZh7WgaAP8H8hmZPiGLA==

node沒有創建,那么不是重啟rabbitmq這個層次的問題啊...

 

解決方法

在windows的服務里面重啟rabbitmq的服務,出現這問題的原因可能是我開了兩個cmd操作rabbitmq,搞到erlang線程死鎖了,,,,

4.代碼里我只寫了mq的簡單應用,比較核心的如分發,訂閱等我都沒有寫,因為用過mq的都知道,這些東西其實都是一樣的簡單的(像我測試queueDeclare那樣一個一個改一個一個看具體作用,好蠢是不是...),主要是設計思路.所以我就不寫其他的例子,只寫了消息隊列中的簡單隊列。

5.這個是上面兩個的低配版。error node with name "rabbit" already running on “xxxxxxx”

一看就知道重啟mq就行了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM