rabbitmq 消息确认机制: 事务 + confirm


/**
 * rabbitmq 工具类
 *
 * @author yangxj
 * @date 2020-03-25 20:30
 *
 * 基本概念: queue 通过 routeKey 绑定 exchange; producer 发送消息到 exchange, exchange 通过 routeKey 发送与之绑定的 queue
 * exchange 类型: direct, fanout, topic
 */
public class RabbitMQUtils {
    /**
     * 获取rabbitMq 连接
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setUsername("guest");
        return factory.newConnection();
    }
}

  

  • 事务机制

    /**
     * 1. 通过事务机制( 缺点: 降低了mq 的吞吐量)
     */

    public static void useTx() throws Exception {
        Connection connection = getConnection();

        Channel channel = connection.createChannel();

        // 声明一个queue
        channel.queueDeclare("test-queue", false, false, false, null);

        // 声明一个topic 类型的 exchange
        channel.exchangeDeclare("test-topic-exchange", BuiltinExchangeType.TOPIC);

        // queue 通过 routeKey 绑定 exchange
        channel.queueBind("test-queue", "test-topic-exchange", "test.*");

        String message = "hello rabbit mq!";

        channel.txSelect();  // 开启事务

        try {
            channel.basicPublish("test-topic-exchange", "test.message", null, message.getBytes()); // 消息发送
            channel.txCommit();
            System.out.println("message send success..");
        } catch (Exception e) {
            channel.txRollback(); // 回滚
            System.out.println("message send fail..");
        } finally {
            channel.close();
            connection.close();
        }

    }

  

  • 生产者confirm模式 (同步)

 /**
     * 2. 生产者 confirm模式(串型)
     * 原理: 当信道channel 进入confrim模式, 所有在该信道发布的消息都会被指派一个唯一的id
     * 当消息被投递到所有匹配的队列后, broker就会发送一个确认给生产者(包含消息id);
     */
    public void producterConfirmSync() throws Exception {
        Connection connection = getConnection(); // 获取rabbitMq 连接

        Channel channel = connection.createChannel();

        channel.confirmSelect(); // 开启confirm模式

        String message = "hello rabbit mq!";
        String message2 = "hello rabbit mq2!";

        channel.basicPublish("", "", MessageProperties.BASIC, message.getBytes());
        channel.basicPublish("", "", MessageProperties.BASIC, message2.getBytes());

        if (channel.waitForConfirms()) { // 支持多条发送后再确认
            System.out.println("message send success..");
        } else {
            System.out.println("message send fail..");
        }
        channel.close();
        connection.close();
    }

  

  • 生产者confirm (异步)

 /**
     * 3. 生产者 confirm模式(异步)
     */
    public void producterConfirmAsync() throws Exception {
        Connection connection = getConnection(); // 获取rabbitMq 连接

        Channel channel = connection.createChannel();

        channel.confirmSelect(); // 开启confirm模式

        // ack ids
        SortedSet<Long> ackIds = new TreeSet<>();

        // not ack ids
        SortedSet<Long> nackIds = new TreeSet<>();

        for (int i = 0; i < 100; i++) {
            String message = "hello rabbit mq: " + i;
            channel.basicPublish("", "", null, message.getBytes());
        }


        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 发送成功(确认)
                // multiple 单条确认 or多条确认 (为true, 表示到这个序列号之前的所有消息都已经得到了处理)
                ackIds.add(deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 发送失败(未确认)
                nackIds.add(deliveryTag);
            }
        });
    }

 

  • 消费者confirm

    /**
     * 消费者 confirm
     */
    public void consumerConfirm() throws Exception {
        Connection connection = getConnection(); // 获取rabbitMq 连接

        Channel channel = connection.createChannel();

        try {
            channel.basicConsume("test", false, // 关闭消费者自动ack
                    new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException {
                            //发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
                            long deliveryTag = envelope.getDeliveryTag();
                            //第二个参数是批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认;
                            // 如果值为false,则只对当前收到的消息进行确认
                            channel.basicAck(deliveryTag, true); // 确认
                        }
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            channel.close();
            connection.close();
        }
    }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM