RabbitMq連接工具類, 提供消息發送與拉取功能


一、引入pom文件依賴

<!-- 集成rabbitmq -->
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.4.3</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>http-client</artifactId>
  <version>2.1.0.RELEASE</version>
  <scope>compile</scope>
  <optional>true</optional>
</dependency>

 

二、RabbitMqUtils.java

import com.rabbitmq.client.*;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Description RabbitMq連接工具類,提供消息發送與拉取功能
 */
public class RabbitMqUtils {

    private static Logger logger = LoggerFactory.getLogger(RabbitMqUtils.class);

    private ConnectionFactory factory;
    private Connection connection ;
    private Channel channel ;


    /**
     * 初始化RabbitMq連接工具類
     * @param host          主機
     * @param port          端口
     * @param userName      用戶名
     * @param password      密碼
     * @param virtualHost   虛擬主機
     * @throws IOException
     * @throws TimeoutException
     */
    public RabbitMqUtils(String host, int port, String  userName, String password, String virtualHost) throws IOException, TimeoutException {
        this.factory = this.initConnectionFactory(host, port, userName, password, virtualHost);
        // 創建與RabbitMQ服務器的TCP連接
        this.connection = connection == null? this.factory.newConnection() : this.connection;
        this.channel = this.channel == null? this.connection.createChannel() : this.channel;
    }

    /**
     * 初始化rabbitMq服務配置
     * @param host          主機
     * @param port          端口
     * @param userName      用戶名
     * @param password      密碼
     * @param virtualHost   虛擬主機
     * @return
     */
    private ConnectionFactory initConnectionFactory(String host, int port, String  userName, String password, String virtualHost){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(userName);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }

    /**
     * 綁定隊列
     * @param exchangeName      交換機名
     * @param queneName         隊列名
     * @param routingKey        路由KEY
     * @param type              消息模式:FANOUT|TOPIC|DIRECT
     * @param durable           是否持久化
     * @param autoDelete        是否自動刪除隊列
     * @throws IOException
     */
    private void queueBind(String exchangeName, String queneName, String routingKey,BuiltinExchangeType type, boolean durable, boolean autoDelete) throws IOException{
        // 聲明交換機類型:交換機,類型,持久化
        this.channel.exchangeDeclare(exchangeName, type, durable);
        if (queneName != null) {
            if (type != BuiltinExchangeType.DIRECT) {
                // 聲明默認的隊列:隊列,持久化,聲明獨占隊列(僅限於此連接),自動刪除隊列,隊列的其他屬性
                this.channel.queueDeclare(queneName, durable, false, autoDelete, null);
            }
            // 將隊列與交換機綁定
            this.channel.queueBind(queneName, exchangeName, routingKey);
        }
    }

    /**
     * 發送消息
     * @param exchangeName      交換機名
     * @param queneName         隊列名
     * @param routingKey        路由KEY
     * @param type              消息模式:FANOUT|TOPIC|DIRECT
     * @param msg               消息
     * @return
     */
    public boolean sendMq(String exchangeName, String queneName, String routingKey, BuiltinExchangeType type,  String msg) {
        try {
            this.queueBind(exchangeName, queneName, routingKey, type, true, true);
            this.channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        } catch (Exception e) {
            logger.error("error", e);
            return false;
        }
        return true;
    }

    /**
     * 拉取隊列消息
     * @param exchangeName      交換機名
     * @param queneName         隊列名
     * @param routingKey        路由KEY
     * @param type              消息模式:FANOUT|TOPIC|DIRECT
     * @param headerInterface   回調實現
     * @throws IOException
     */
    public void pullMq(String exchangeName, String queneName, String routingKey, BuiltinExchangeType type, HeaderInterface headerInterface) throws IOException{
        if (queneName == null){
            queneName = (type == BuiltinExchangeType.DIRECT) ? this.channel.queueDeclare().getQueue(): queneName;
        }
        String newsQueueName = queneName;
        this.queueBind(exchangeName, newsQueueName, routingKey, type, true, true);
        // 創建接收客戶端,當有消息,則直接回調handleDelivery方法
        Consumer consumer = new DefaultConsumer(this.channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                logger.info("exchang:{}, routingKey:{}, queueName:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(),newsQueueName, message);
                headerInterface.execute(consumerTag, body);
            }
        };
        // channel綁定隊列、消費者,autoAck為true表示一旦收到消息則自動回復確認消息
        this.channel.basicConsume(newsQueueName, true, consumer);
    }

    /**
     * 關閉連接通道
     * @throws IOException
     * @throws TimeoutException
     */
    public void close() throws IOException, TimeoutException {
        if (this.channel != null) {
            this.channel.close();
            this.channel = null;
        }
        if (connection != null) {
            this.connection.close();
            this.connection = null;
        }
        this.factory = null;
    }

    /**
     * 函數式回調接口
     */
    @FunctionalInterface
    interface HeaderInterface{
        void execute(String consumerTag, byte[] body) throws IOException ;
    }

    /**
     * 測試入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        String [] words = new String[]{"props","student","build","name","execute"};
        RabbitMqUtils rabbitMqUtils2 = new RabbitMqUtils("192.168.1.3", 5672, "admin", "admin", "datastream");
        int i =0;

        //FANOUT模式不需要routingKey,因此routingKey傳空字符串(不要設置成null)
        while (i < words.length) {
            rabbitMqUtils2.sendMq("amq.fanout", "test1", "", BuiltinExchangeType.FANOUT,words[i] + "," + RandomUtils.nextInt(1,100));
            i++;
        }
        System.out.println("發送結束");

        System.out.println("接收fanout消息");
        rabbitMqUtils2.pullMq("amq.fanout", "test1", "", BuiltinExchangeType.FANOUT, (record, body) -> {
            String message = new String(body, "UTF-8");
            System.out.println(message);
        });

//        while (i < words.length) {
//            rabbitMqUtils2.sendMq("amq.topic", "test-topic", "topic-key", BuiltinExchangeType.TOPIC, words[i] + "," + RandomUtils.nextInt(1,100));
//            i++;
//        }
//        System.out.println("發送topic結束");
//
//        System.out.println("接收topic消息");
//        rabbitMqUtils2.pullMq("amq.topic", "test-topic", "topic-key", BuiltinExchangeType.TOPIC, (record, body) -> {
//            String message = new String(body, "UTF-8");
//            System.out.println(message);
//        });

        //生產者和消費者,分開兩個測試類執行,否則會導制隊列綁定失敗
//        while (i < words.length) {
//            rabbitMqUtils2.sendMq("amq.direct", null,  "direct-key", BuiltinExchangeType.DIRECT, words[i] + "," + RandomUtils.nextInt(1,100));
//            i++;
//        }
//        System.out.println("發送direct結束");

//        System.out.println("接收direct消息");
//        rabbitMqUtils2.pullMq("amq.direct", null,  "direct-key", BuiltinExchangeType.DIRECT, (record, body) -> {
//            String message = new String(body, "UTF-8");
//            System.out.println(message);
//        });

        rabbitMqUtils2.close();
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM