./rabbitmqctl add_user admin1 admin1
./rabbitmqctl set_user_tags admin1 administrator
./rabbitmqctl set_permissions -p "/" admin1 ".*" ".*" ".*"
測試用例:
producer:
package com.rq.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer01 { //隊列名稱 private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.156.207"); factory.setPort(5672); factory.setUsername("admin1"); factory.setPassword("admin1"); factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器 //創建與RabbitMQ服務的TCP連接 connection = factory.newConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 channel = connection.createChannel(); /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數 */ channel.queueDeclare(QUEUE, true, false, false, null); String message = "helloworld小明"+System.currentTimeMillis(); /** * 消息發布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 */ /** * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯 示綁定或解除綁定 * 默認的交換機,routingKey等於隊列名稱 */ channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } catch(Exception ex) { ex.printStackTrace(); } finally { if(channel != null) { channel.close(); } if(connection != null) { connection.close(); } } } }
consumer:
package com.rq.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer01 { private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //設置MabbitMQ所在服務器的ip和端口 factory.setHost("192.168.156.207"); factory.setPort(5672); factory.setUsername("admin1"); factory.setPassword("admin1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE, true, false, false, null); //定義消費方法 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發送) * @param properties * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg); } }; /** * 監聽隊列String queue, boolean autoAck,Consumer callback * 參數明細 * 1、隊列名稱 * 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 為false則需要手動回復 * 3、消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE, true, consumer); } }