linux rabbitmq 遠程登錄


./rabbitmqctl add_user admin1 admin1

./rabbitmqctl set_user_tags admin1 administrator

./rabbitmqctl set_permissions -p "/" admin1 ".*" ".*" ".*" 

 測試用例:

producer:

package com.rq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //隊列名稱
    private static final String QUEUE = "helloworld";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.156.207");
            factory.setPort(5672);
            factory.setUsername("admin1");
            factory.setPassword("admin1");
            factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器
//創建與RabbitMQ服務的TCP連接
            connection = factory.newConnection();
//創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
/**
 * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
 * param1:隊列名稱
 * param2:是否持久化
 * param3:隊列是否獨占此連接
 * param4:隊列不再使用時是否自動刪除此隊列
 * param5:隊列參數
 */
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "helloworld小明"+System.currentTimeMillis();
/**
 * 消息發布方法
 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
 * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
 * param3:消息包含的屬性
 * param4:消息體
 */
/**
 * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯
 示綁定或解除綁定
 * 默認的交換機,routingKey等於隊列名稱
 */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
        finally
        {
            if(channel != null)
            {
                channel.close();
            }
            if(connection != null)
            {
                connection.close();
            }
        }
    }
}

consumer:

package com.rq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    private static final String QUEUE = "helloworld";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
//設置MabbitMQ所在服務器的ip和端口
        factory.setHost("192.168.156.207");
        factory.setPort(5672);
        factory.setUsername("admin1");
        factory.setPassword("admin1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//聲明隊列
        channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
 * 消費者接收消息調用此方法
 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
(收到消息失敗后是否需要重新發送)
 * @param properties
 * @param body
 * @throws IOException
 */
@Override
public void handleDelivery(String consumerTag,
                           Envelope envelope,
                           AMQP.BasicProperties properties,
                           byte[] body)
        throws IOException {
//交換機
    String exchange = envelope.getExchange();
//路由key
    String routingKey = envelope.getRoutingKey();
//消息id
    long deliveryTag = envelope.getDeliveryTag();
//消息內容
    String msg = new String(body,"utf-8");
    System.out.println("receive message.." + msg);
}
        };
/**
 * 監聽隊列String queue, boolean autoAck,Consumer callback
 * 參數明細
 * 1、隊列名稱
 * 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置
 為false則需要手動回復
 * 3、消費消息的方法,消費者接收到消息后調用此方法
 */
        channel.basicConsume(QUEUE, true, consumer);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM