rabbitmq消息可靠性和插件机制


1、消息可靠性

rabbitmq一般通过三个方面保证消息的可靠性:

(1)、发送可靠性:确保消息成功发送到broker端。

 

  rabbitmq支持“最多一次”和“最少一次”。

其中“最少一次”的实现需要考虑以下几个方面的内容:

消息生产者需要开启事务机制或者publisher confirm机制,已保证消息可以可靠的传输到rabbitmq中;消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。

“最多一次”的方式就无须考虑以上内容,生产者随意发送,不过这样很难保证消息或成功发送。

配置消息回调

参考地址:https://www.cnblogs.com/wangzaiplus/p/11213709.html

目前回调存在ConfirmCallback和ReturnCallback两者。他们的区别在于

如果消息没有到exchange,则ConfirmCallback回调,ack=false, 如果消息到达exchange,则ConfirmCallback回调,ack=true exchange到queue成功,则不回调ReturnCallback(必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调)

ConfirmCallback的回调

/** * 消息发送成功的回调 * 需要开启 * # 开启发送确认 * spring.rabbitmq.publisher-confirm-type=CORRELATED **/ @Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData必须在发送的时候设置,否则将会使null,如下: //CorrelationData correlationData = new CorrelationData("1"); //rabbitTemplate.convertAndSend("OrderDispathExchange","kk", // "{\"msgId\":1,\"data\":\"测试\"}",correlationData);
        log.info("消息唯一标识: {}", correlationData); log.info("确认状态: {}", ack); log.info("造成原因: {}", cause); } }

ReturnCallback的回调

/** * 发生异常时的消息返回提醒 * 需要开启 * # 开启发送失败退回 * spring.rabbitmq.publisher-returns=true **/ @Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体: {}", message); log.info("回复编码: {}", replyCode); log.info("回复内容: {}", replyText); log.info("交换器: {}", exchange); log.info("路由键: {}", routingKey); } }

将回调配置到模板中

@Configuration public class RabbitMqConf { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ CachingConnectionFactory factory = (CachingConnectionFactory)connectionFactory; factory.addConnectionListener(new ConnectionListener() { @Override public void onCreate(Connection connection) { System.err.println("端口:"+connection.getLocalPort()); } });      RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); return rabbitTemplate; }

(2)、存储可靠性:broker端对消息的持久化,确保消息不会丢失。

(3)、消费可靠性:确保消息成功被消费。(可以参考:RabbitMq解决分布式事务 

 消费者在消费消息的同时,需要将autoAck设置为false(spring.rabbitmq.listener.simple.acknowledge-mode=manual),然后通过手动确认的方式去确认已经正常消费的消息,以免在消费端引起不必要的消息丢失。

2、插件机制

rabbitmq支持插件,通过插件可以扩展多种核心功能:支持多种协议、系统状态监控、其它AMQP 0-9-1交换类型、节点联合等。许多功能都是通过插件实现的。

可以通过命令:rabbitmq-plugins list查看当前rabbitmq安装的插件

 通过以下命令可以启用或禁用指定插件

#启用插件 rabbitmq-plugins enable <plugin_name> #禁用插件 rabbitmq-plugins disable<plugin_name>

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM