RabbitMq安裝與測試教程

Installing on Mac

I. 安裝

1
2
3
4
5
6
7
8
9
brew install rabbitmq

## 進入安裝目錄
cd /usr/local/Cellar/rabbitmq/3.7.5

# 啟動
brew services start rabbitmq
# 當前窗口啟動
rabbitmq-server

啟動控制台之前需要先開啟插件

1
./rabbitmq-plugins enable rabbitmq_management

進入控制台: http://localhost:15672/

用戶名和密碼:guest,guest

II. 配置與測試

1. 添加賬號

首先是得啟動mq

1
2
3
4
5
6
## 添加賬號
./rabbitmqctl add_user admin admin
## 添加訪問權限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設置超級權限
./rabbitmqctl set_user_tags admin administrator

2. 編碼實測

pom引入依賴

1
2
3
4
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>

開始寫代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class RabbitMqTest {

//消息隊列名稱
private final static String QUEUE_NAME = "hello";

@Test
public void send() throws java.io.IOException, TimeoutException {

//創建連接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//創建連接
Connection connection = factory.newConnection();

//創建消息通道
Channel channel = connection.createChannel();

//生成一個消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);


for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;

//發布消息,第一個參數表示路由(Exchange名稱),未""則表示使用默認消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");
}


//關閉消息通道和連接
channel.close();
connection.close();

}


@Test
public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

//創建連接
Connection connection = factory.newConnection();

//創建消息信道
Channel channel = connection.createChannel();

//消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");

AtomicInteger count = new AtomicInteger(0);

//消費者用於獲取消息信道綁定的消息隊列中的信息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");

try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);

Thread.sleep(1000 * 60);
}
}

需要注意的一點是:

  • 生產消息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 消費消息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 生產和消費都聲明channel,要求兩者的配置參數一致,否則無法消費數據

3. 輸出說明

首先執行塞入數據,執行完畢之后,可以到控制台進行查看:

out

可以看到多出了一個Queue,對列名為hello,總共有10條數據


接下來就是消費數據了,執行consumer方法,輸出日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[*] Waiting for message. To exist press CTRL+C
[x] Received 'Hello World RabbitMQ count: 0
[x] Done
[x] Received 'Hello World RabbitMQ count: 1
[x] Done
[x] Received 'Hello World RabbitMQ count: 2
[x] Done
[x] Received 'Hello World RabbitMQ count: 3
[x] Done
[x] Received 'Hello World RabbitMQ count: 4
[x] Done
[x] Received 'Hello World RabbitMQ count: 5
[x] Done
[x] Received 'Hello World RabbitMQ count: 6
[x] Done
[x] Received 'Hello World RabbitMQ count: 7
[x] Done
[x] Received 'Hello World RabbitMQ count: 8
[x] Done
[x] Received 'Hello World RabbitMQ count: 9
[x] Done

回頭去查看queue,發現總得數據量為0了

4. ACK問題

對於ack的問題,如果在消費數據的時候,出現異常,而我不希望數據丟失,這個時候就需要考慮手動ack的機制來保證了

首先需要設置手動ack

1
2
// 設置autoAck為false
channel.basicConsume(QUEUE_NAME, false, consumer);

其次在消費數據完畢之后,主動ack/nack

1
2
3
4
5
if (success) {
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicNack(envelope.getDeliveryTag(), false, false);
}

 


 

 

https://www.cnblogs.com/jasonLiu2018/p/12927512.html

RabbitMQ no access to this vhost

連接的失敗報錯:RabbitMQ Exception (403) Reason: "no access to this vhost"

因為沒有配置該用戶的訪問權限,可以通過
rabbitmqctl add_vhost admin
來添加,並賦予權限:
rabbitmqctl set_permissions -p admin 用戶名 "." "." ".*"

代碼在連接的時候,必須制定對應的vhost,否則是沒有訪問權限:
conn, err := amqp.Dial("amqp://sky:password@ip:5672/admin”)