rabbitmq 消息確認機制: 事務 + confirm


/**
 * rabbitmq 工具類
 *
 * @author yangxj
 * @date 2020-03-25 20:30
 *
 * 基本概念: queue 通過 routeKey 綁定 exchange; producer 發送消息到 exchange, exchange 通過 routeKey 發送與之綁定的 queue
 * exchange 類型: direct, fanout, topic
 */
public class RabbitMQUtils {
    /**
     * 獲取rabbitMq 連接
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setUsername("guest");
        return factory.newConnection();
    }
}

  

  • 事務機制

    /**
     * 1. 通過事務機制( 缺點: 降低了mq 的吞吐量)
     */

    public static void useTx() throws Exception {
        Connection connection = getConnection();

        Channel channel = connection.createChannel();

        // 聲明一個queue
        channel.queueDeclare("test-queue", false, false, false, null);

        // 聲明一個topic 類型的 exchange
        channel.exchangeDeclare("test-topic-exchange", BuiltinExchangeType.TOPIC);

        // queue 通過 routeKey 綁定 exchange
        channel.queueBind("test-queue", "test-topic-exchange", "test.*");

        String message = "hello rabbit mq!";

        channel.txSelect();  // 開啟事務

        try {
            channel.basicPublish("test-topic-exchange", "test.message", null, message.getBytes()); // 消息發送
            channel.txCommit();
            System.out.println("message send success..");
        } catch (Exception e) {
            channel.txRollback(); // 回滾
            System.out.println("message send fail..");
        } finally {
            channel.close();
            connection.close();
        }

    }

  

  • 生產者confirm模式 (同步)

 /**
     * 2. 生產者 confirm模式(串型)
     * 原理: 當信道channel 進入confrim模式, 所有在該信道發布的消息都會被指派一個唯一的id
     * 當消息被投遞到所有匹配的隊列后, broker就會發送一個確認給生產者(包含消息id);
     */
    public void producterConfirmSync() throws Exception {
        Connection connection = getConnection(); // 獲取rabbitMq 連接

        Channel channel = connection.createChannel();

        channel.confirmSelect(); // 開啟confirm模式

        String message = "hello rabbit mq!";
        String message2 = "hello rabbit mq2!";

        channel.basicPublish("", "", MessageProperties.BASIC, message.getBytes());
        channel.basicPublish("", "", MessageProperties.BASIC, message2.getBytes());

        if (channel.waitForConfirms()) { // 支持多條發送后再確認
            System.out.println("message send success..");
        } else {
            System.out.println("message send fail..");
        }
        channel.close();
        connection.close();
    }

  

  • 生產者confirm (異步)

 /**
     * 3. 生產者 confirm模式(異步)
     */
    public void producterConfirmAsync() throws Exception {
        Connection connection = getConnection(); // 獲取rabbitMq 連接

        Channel channel = connection.createChannel();

        channel.confirmSelect(); // 開啟confirm模式

        // ack ids
        SortedSet<Long> ackIds = new TreeSet<>();

        // not ack ids
        SortedSet<Long> nackIds = new TreeSet<>();

        for (int i = 0; i < 100; i++) {
            String message = "hello rabbit mq: " + i;
            channel.basicPublish("", "", null, message.getBytes());
        }


        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 發送成功(確認)
                // multiple 單條確認 or多條確認 (為true, 表示到這個序列號之前的所有消息都已經得到了處理)
                ackIds.add(deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 發送失敗(未確認)
                nackIds.add(deliveryTag);
            }
        });
    }

 

  • 消費者confirm

    /**
     * 消費者 confirm
     */
    public void consumerConfirm() throws Exception {
        Connection connection = getConnection(); // 獲取rabbitMq 連接

        Channel channel = connection.createChannel();

        try {
            channel.basicConsume("test", false, // 關閉消費者自動ack
                    new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException {
                            //發布的每一條消息都會獲得一個唯一的deliveryTag,deliveryTag在channel范圍內是唯一的
                            long deliveryTag = envelope.getDeliveryTag();
                            //第二個參數是批量確認標志。如果值為true,則執行批量確認,此deliveryTag之前收到的消息全部進行確認;
                            // 如果值為false,則只對當前收到的消息進行確認
                            channel.basicAck(deliveryTag, true); // 確認
                        }
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            channel.close();
            connection.close();
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM