java使用RabbitMQ要点知识


转载于:https://blog.csdn.net/LeiXiaoTao_Java/article/details/78924863

1、maven依赖


  
  
  
  1. <dependency>
  2. <groupId>commons-lang</groupId>
  3. <artifactId>commons-lang</artifactId>
  4. <version> 2.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.rabbitmq</groupId>
  8. <artifactId>amqp-client</artifactId>
  9. <version> 3.4.1</version>
  10. </dependency>


2、RabbitMQ重要方法介绍(基本常用的)

2.1、创建连接


  
  
  
  1. // 创建连接工厂
  2. ConnectionFactory cf = new ConnectionFactory();
  3. // 设置rabbitmq服务器IP地址
  4. cf.setHost( "*.*.*.*");
  5. // 设置rabbitmq服务器用户名
  6. cf.setUsername( "***");
  7. // 设置rabbitmq服务器密码
  8. cf.setPassword( "***");
  9. // 指定端口,默认5672
  10. cf.setPort(AMQP.PROTOCOL.PORT);
  11. // 获取一个新的连接
  12. connection = cf.newConnection();
  13. // 创建一个通道
  14. channel = connection.createChannel();
  15. //关闭管道和连接
  16. channel.close();
  17. connection.close();

2.2、声明队列


  
  
  
  1. /**
  2. * 申明一个队列,如果这个队列不存在,将会被创建
  3. * @param queue 队列名称
  4. * @param durable 持久性:true队列会再重启过后存在,但是其中的消息不会存在。
  5. * @param exclusive 是否只能由创建者使用,其他连接不能使用。
  6. * @param autoDelete 是否自动删除(没有连接自动删除)
  7. * @param arguments 队列的其他属性(构造参数)
  8. * @return Queue.DeclareOk:宣告队列的声明确认方法已成功声明。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.queueDeclare( "testQueue", true, false, false, null);


此方法一般由Producer调用创建消息队列。如果由Consumer创建队列,有可能Producer发布消息的时候Queue还没有被创建好,会造成消息丢失的情况。

 

2.3、声明Exchange


  
  
  
  1. /**
  2. * 声明一个 exchange.
  3. * @param exchange 名称
  4. * @param type exchange type:direct、fanout、topic、headers
  5. * @param durable 持久化
  6. * @param autoDelete 是否自动删除(没有连接自动删除)
  7. * @param arguments 队列的其他属性(构造参数)
  8. * @return 成功地声明了一个声明确认方法来指示交换。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.exchangeDeclare( "leitao", "topic", true, false, null);

2.4、将queue和Exchange进行绑定(Binding)


  
  
  
  1. /**
  2. * 将队列绑定到Exchange,不需要额外的参数。
  3. * @param queue 队列名称
  4. * @param exchange 交换机名称
  5. * @param routingKey 路由关键字
  6. * @return Queue.BindOk:如果成功创建绑定,则返回绑定确认方法。
  7. * @throws java.io.IOException if an error is encountered
  8. */
  9. channel.queueBind( "testQueue", "leitao", "testRoutingKey");

2.5、发布消息


  
  
  
  1. /**
  2. * 发布一条不用持久化的消息,且设置两个监听。
  3. * @param exchange 消息交换机名称,空字符串将使用直接交换器模式,发送到默认的Exchange=amq.direct。此状态下,RoutingKey默认和Queue名称相同
  4. * @param routingKey 路由关键字
  5. * @param mandatory 监听是否有符合的队列
  6. * @param immediate 监听符合的队列上是有至少一个Consumer
  7. * @param BasicProperties 设置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化。
  8. * @param body 消息对象转换的byte[]
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.basicPublish( "",queueName, true, false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));

exchange的值为空字符串或者是amq.direct时,此时的交换器类型默认是direct类型可以不用单独声明Exchange,也不用单独进行Binding,系统默认将queue名称作为RoutingKey进行了绑定。

 

两个传入参数的含义

mandatory

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

immediate

当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

注意:在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate=true标记的publish会返回如下错误:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。

为什么取消支持:immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。

 

2.6、接收消息


  
  
  
  1. /**
  2. * 设置消费批量投递数目,一次性投递10条消息。当消费者未确认消息累计达到10条时,rabbitMQ将不会向此Channel上的消费者投递消息,直到未确认数小于10条再投递
  3. * @param prefetchCount 投递数目
  4. * @param global 是否针对整个Channel。true表示此投递数是给Channel设置的,false是给Channel上的Consumer设置的。
  5. * @throws java.io.IOException if an error is encountered
  6. */
  7. channel.basicQos( 10, false);
  8. //整个传输管道最多15条,具体分到每个消费者身上又不能大于10条
  9. channel.basicQos( 15, true);
  10. /**
  11. * 开始一个非局部、非排他性消费, with a server-generated consumerTag.
  12. * 执行这个方法会回调handleConsumeOk方法
  13. * @param queue 队列名称
  14. * @param autoAck 是否自动应答。false表示consumer在成功消费过后必须要手动回复一下服务器,如果不回复,服务器就将认为此条消息消费失败,继续分发给其他consumer。
  15. * @param callback 回调方法类,一般为自己的Consumer类
  16. * @return 由服务器生成的consumertag
  17. * @throws java.io.IOException if an error is encountered
  18. */
  19. channel.basicConsume(queueName, false, Consumer);

2.7、Consumer处理消息


  
  
  
  1. /**
  2. * 消费者收到消息的回调函数
  3. * @param consumerTag 消费者标签
  4. * @param envelope 消息的包装数据
  5. * @param properties 消息的内容头数据
  6. * @param body 消息对象的byte[]
  7. * @throws IOException
  8. */
  9. void handleDelivery(String consumerTag,
  10. Envelope envelope,
  11. AMQP.BasicProperties properties,
  12. byte[] body)
  13. throws IOException;

3、Producer消息确认机制

3.1、什么是生产者消息确认机制?

没有消息确认模式时,生产者不知道消息是不是已经到达了Broker服务器,这对于一些业务严谨的系统来说将是灾难性的。消息确认模式可以采用AMQP协议层面提供的事务机制实现(此文没有这种实现方式),但是会降低RabbitMQ的吞吐量。RabbitMQ自身提供了一种更加高效的实现方式:confirm模式。

消息生产者通过调用Channel.confirmSelect()方法将Channel信道设置成confirm模式。一旦信道被设置成confirm模式,该信道上的所有消息都会被指派一个唯一的ID(从1开始),一旦消息被对应的Exchange接收,Broker就会发送一个确认给生产者(其中deliveryTag就是此唯一的ID),这样消息生产者就知道消息已经成功到达Broker。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

3.2、开启confirm模式

如上所说生产者通过调用Channel.confirmSelect()方法将Channel信道设置成confirm模式。

注意:已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的

3.3、普通confirm模式

普通confirm模式是串行的,即每次发送了一次消息,生产者都要等待Broker的确认消息,然后根据确认标记权衡消息重发还是继续发下一条。由于是串行的,在效率上是比较低下的。

 

(1)重点方法


  
  
  
  1. /**
  2. * 等待Broker返回消息确认标记
  3. * 注意,在非确定的通道,waitforconfirms抛出IllegalStateException。
  4. * @return 是否发送成功
  5. * @throws java.lang.IllegalStateException
  6. */
  7. boolean waitForConfirms() throws InterruptedException;

(2 部分使用代码如下:


  
  
  
  1. //注意:返回的时候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. int i= 1;
  4. while (i<= 50) {
  5. //发布消息
  6. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. //等待Broker的确认回调
  8. if(channel.waitForConfirms())
  9. System.out.println( "send success!");
  10. else
  11. System.out.println( "send error!");
  12. i++;
  13. }

3.4、批量confirm模式

批量confirm模式是异步的方式,效率要比普通confirm模式高许多,但是此种方式也会造成线程阻塞,想要进行失败重发就必须要捕获异常。网络上还有采用waitForConfirms()实现批量confirm模式的,但是只要一条失败了,就必须把这批次的消息统统再重发一次,非常的消耗性能,因此此文不予考虑。

 

(1)重点代码


  
  
  
  1. /**
  2. * 等待直到所有消息被确认或者某个消息发送失败。如果消息发送确认失败了,
  3. * waitForConfirmsOrDie 会抛出IOException异常。当在非确认通道上调用时
  4. * ,会抛出IllegalStateException异常。
  5. * @throws java.lang.IllegalStateException
  6. */
  7. void waitForConfirmsOrDie() throws IOException, InterruptedException;

(2 )部分代码如下:


  
  
  
  1. //注意:返回的时候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. int i= 1;
  4. while (i<= 50) {
  5. //发布消息
  6. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. i++;
  8. }
  9. channel.waitForConfirmsOrDie();

3.5、ConfirmListener监听器模式

RabbitMQ提供了一个ConfirmListener接口专门用来进行确认监听,我们可以实现ConfirmListener接口来创建自己的消息确认监听。ConfirmListener接口中包含两个回调方法:


  
  
  
  1. /**
  2. * 生产者发送消息到exchange成功的回调方法
  3. */
  4. void handleAck(long deliveryTag, boolean multiple) throws IOException;
  5. /**
  6. * 生产者发送消息到服务器broker失败的回调方法,服务器丢失了此消息。
  7. * 注意,丢失的消息仍然可以传递给消费者,但broker不能保证这一点。
  8. */
  9. void handleNack(long deliveryTag, boolean multiple) throws IOException;

其中deliveryTagBroker给每条消息指定的唯一ID(从1开始);multiple表示是否接收所有的应答消息,比如multiple=true时,发送100条消息成功过后,我们并不会收到100次handleAck方法调用。

 

(1)重要方法


  
  
  
  1. //注册消息确认监听器
  2. channel.addConfirmListener( new MyConfirmListener());

(2)部分使用代码如下:


  
  
  
  1. //注意:返回的时候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. //注册消息确认监听器
  4. channel.addConfirmListener( new MyConfirmListener());
  5. //注册消息结果返回监听器
  6. channel.addReturnListener( new MyReturnListener());
  7. int i= 1;
  8. while (i<= 50) {
  9. //发布消息
  10. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.
  11. serialize(object));
  12. i++;
  13. }

 


  
  
  
  1. //自定义的消息确认监听器
  2. public class MyConfirmListener implements ConfirmListener{
  3. /**
  4. * 生产者发送消息到exchange成功的回调方法
  5. * 消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。但是可以设置ReturnListener监听来监听有没有匹配的队列。
  6. * 因此handleAck执行了,并不能完全表示消息已经进入了对应的队列,只能表示对应的exchange成功的接收了消息。
  7. * 消息被exchange接收过后,还需要通过一定的匹配规则分发到对应的队列queue中。
  8. */
  9. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  10. //注意:deliveryTag是broker给消息指定的唯一id(从1开始)
  11. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)成功!multiple="+multiple);
  12. }
  13. /**
  14. * 生产者发送消息到服务器broker失败的回调方法,服务器丢失了此消息。
  15. * 注意,丢失的消息仍然可以传递给消费者,但broker不能保证这一点。(不明白,既然丢失了,为啥还能发送)
  16. */
  17. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  18. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)失败!服务器broker丢失了消息");
  19. }
  20. }



  
  
  
  1. //自定义的结果返回监听器
  2. /**
  3. * 实现此接口以通知交付basicpublish失败时,“mandatory”或“immediate”的标志监听(源代码注释翻译)。
  4. * 在发布消息时设置mandatory等于true,监听消息是否有相匹配的队列,
  5. * 没有时ReturnListener将执行handleReturn方法,消息将返给发送者
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  9. BasicProperties properties, byte[] body) throws IOException {
  10. System.out.println( "消息发送到队列失败:回复失败编码:"+replyCode+ ";回复失败文本:"+replyText+ ";失败消息对象:"+SerializationUtils.deserialize(body));
  11. }
  12. }


4、Consumer消息确认机制

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在注册消费者时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(或磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息(web管理界面上的Ready状态);一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息(web管理界面上的Unacked状态)。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

 

(1)重要方法


  
  
  
  1. /**
  2. *1. 开始一个非局部、非排他性消费, with a server-generated consumerTag.
  3. * 注意:执行这个方法会回调handleConsumeOk方法,在此方法中处理消息。
  4. * @param queue 队列名称
  5. * @param autoAck 是否自动应答。false表示consumer在成功消费过后必须要手动回复一下服务器,如果不回复,服务器就将认为此条消息消费失败,继续分发给其他consumer。
  6. * @param callback 回调方法类
  7. * @return 由服务器生成的consumertag
  8. * @throws java.io.IOException if an error is encountered
  9. */
  10. String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  11. /**
  12. *2
  13. consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。
  14. 例如:有值为5,6,7,8 deliveryTag的投递
  15. 如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。
  16. 如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。
  17. */
  18. void basicAck(long deliveryTag, boolean multiple) throws IOException;
  19. /**3
  20. consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。
  21. 如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。
  22. 如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。
  23. 如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
  24. 如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
  25. */
  26. void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  27. /**4
  28. 相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。
  29. 如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
  30. 如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
  31. */
  32. void basicReject(long deliveryTag, boolean requeue) throws IOException;


(2)部分使用代码如下:


  
  
  
  1. //this表示自己的Consumer
  2. channel.basicConsume(queueName, false, this);
  3. ...
  4. @Override
  5. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  6. if (body == null)
  7. return;
  8. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  9. /**
  10. * 专门处理奇数消息的消费者
  11. */
  12. int tagId = (Integer) map.get( "tagId");
  13. if (tagId % 2 != 0) {
  14. //处理消息
  15. System.out.println( "接收并处理消息:"+tagId);
  16. //通知服务器此消息已经被处理了
  17. channel.basicAck(envelope.getDeliveryTag(), false);
  18. } else{
  19. //通知服务器消息处理失败,重新放回队列。false表示处理失败消息不放会队列,直接删除
  20. channel.basicReject(envelope.getDeliveryTag(), true);
  21. }
  22. }


5、Demo项目整体代码

此demo就是向RabbitMQ服务器上面发送20个消息,消息体是map,里面装的是tagId=数字。然后注册了两个消费者,分别处理奇数和偶数。

5.1、连接工具类


  
  
  
  1. /**
  2. * 连接工具类
  3. */
  4. public class ConnectionUtil {
  5. Channel channel;
  6. Connection connection;
  7. String queueName;
  8. public ConnectionUtil(String queueName) throws IOException {
  9. this.queueName = queueName;
  10. // 创建连接工厂
  11. ConnectionFactory cf = new ConnectionFactory();
  12. // 设置rabbitmq服务器IP地址
  13. cf.setHost( "*.16.0.*");
  14. // 设置rabbitmq服务器用户名
  15. cf.setUsername( "*");
  16. // 设置rabbitmq服务器密码
  17. cf.setPassword( "*");
  18. cf.setPort(AMQP.PROTOCOL.PORT);
  19. // 获取一个新的连接
  20. connection = cf.newConnection();
  21. // 创建一个通道
  22. channel = connection.createChannel();
  23. /**
  24. *申明一个队列,如果这个队列不存在,将会被创建
  25. * @param queue 队列名称
  26. * @param durable 持久性:true队列会再重启过后存在,但是其中的消息不会存在。
  27. * @param exclusive 是否只能由创建者使用
  28. * @param autoDelete 是否自动删除(没有连接自动删除)
  29. * @param arguments 队列的其他属性(构造参数)
  30. * @return 宣告队列的声明确认方法已成功声明。
  31. * @throws java.io.IOException if an error is encountered
  32. */
  33. channel.queueDeclare(queueName, true, false, false, null);
  34. }
  35. public void close() throws IOException{
  36. channel.close();
  37. connection.close();
  38. }
  39. }

 

5.2、具体生产者


  
  
  
  1. /**
  2. * 消息生产者
  3. */
  4. public class MessageProducer {
  5. private ConnectionUtil connectionUtil;
  6. public MessageProducer(ConnectionUtil connectionUtil){
  7. this.connectionUtil=connectionUtil;
  8. }
  9. /**
  10. * 发送消息到队列中
  11. */
  12. public void sendMessage(Serializable object) throws IOException{
  13. /**
  14. * Publish a message
  15. * @param exchange 消息交换机名称,空字符串将使用直接交换器模式,发送到默认的Exchange=amq.direct
  16. * @param routingKey 路由关键字
  17. * @param mandatory 监听是否有符合的队列
  18. * @param BasicProperties 设置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化
  19. * @param body 消息对象
  20. * @throws java.io.IOException if an error is encountered
  21. */
  22. connectionUtil.channel.basicPublish( "", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
  23. System.out.println( "MessageProducer发送了一条消息:"+object);
  24. }
  25. }

5.3、公共消费者父类


  
  
  
  1. /**
  2. * 消息消费者基础类
  3. */
  4. public class MessageConsumer implements Consumer {
  5. //消费者标签,注册成功时由rabbitmq服务器自动生成
  6. protected String consumerTag;
  7. protected ConnectionUtil connectionUtil;
  8. public MessageConsumer(ConnectionUtil connectionUtil){
  9. this.connectionUtil=connectionUtil;
  10. }
  11. public void basicConsume(){
  12. try {
  13. /**
  14. * 设置消费投递数目,一次性投递10条消息。当消费者未确认消息达到10条时,rabbitMQ将不会向此消费者投递消息,直到未确认数小于10条再投递
  15. * @param prefetchCount 投递数目
  16. * @param global 是否针对整个Channel。true表示此投递数是给Channel设置的,false是给Channel上的Consumer设置的。
  17. * @throws java.io.IOException if an error is encountered
  18. */
  19. connectionUtil.channel.basicQos( 10, false); //表示每个消费者最多10条
  20. connectionUtil.channel.basicQos( 15, true); //整个传输管道最多15条,具体分到每个消费者身上又不能大于10条
  21. /**
  22. * 开始一个非局部、非排他性消费, with a server-generated consumerTag.
  23. * 执行这个方法会回调handleConsumeOk方法
  24. * @param queue 队列名称
  25. * @param autoAck 是否自动应答。false表示consumer在成功消费过后必须要手动回复一下服务器,如果不回复,服务器就将认为此条消息消费失败,继续分发给其他consumer。
  26. * @param callback 回调方法类
  27. * @return 由服务器生成的consumertag
  28. * @throws java.io.IOException if an error is encountered
  29. */
  30. connectionUtil.channel.basicConsume(connectionUtil.queueName, false, this);
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. /**
  36. * 收到消息时的回调函数
  37. */
  38. public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
  39. //子类重写覆盖具体操作
  40. }
  41. /**
  42. * 消费者注册成功回调函数
  43. */
  44. public void handleConsumeOk(String consumerTag) {
  45. this.consumerTag=consumerTag;
  46. System.out.println( "消费者:"+consumerTag+ ",注册成功!");
  47. }
  48. /**
  49. * 手动取消消费者注册成功回调函数
  50. * 当调用Channel类的void basicCancel(String consumerTag) throws IOException;方法触发此回调函数
  51. */
  52. public void handleCancelOk(String consumerTag) {
  53. System.out.println(consumerTag+ " 手动取消消费者注册成功!");
  54. }
  55. /**
  56. * 当消费者因为其他原因被动取消注册时调用,比如queue被删除了。
  57. */
  58. public void handleCancel(String consumerTag) throws IOException {
  59. System.out.println( "因为外部原因消费者:"+consumerTag+ " 取消注册!");
  60. }
  61. /**
  62. * 当通道或基础连接被关闭时调用
  63. */
  64. public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
  65. System.out.println( "通道或基础连接被关闭");
  66. }
  67. /**
  68. * Called when a <code><b>basic.recover-ok</b></code> is received
  69. * in reply to a <code><b>basic.recover</b></code>. All messages
  70. * received before this is invoked that haven't been <i>ack</i>'ed will be
  71. * re-delivered. All messages received afterwards won't be.
  72. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  73. */
  74. public void handleRecoverOk(String consumerTag) {
  75. }
  76. }

 

5.4、具体的消费者


  
  
  
  1. /**
  2. * 专门处理偶数消息的消费者
  3. */
  4. public class EvenConsumer extends MessageConsumer {
  5. public EvenConsumer(ConnectionUtil connectionUtil) {
  6. super(connectionUtil);
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. this.consumerTag=consumerTag;
  11. System.out.println( "EvenConsumer消费者:"+consumerTag+ ",注册成功!");
  12. }
  13. @Override
  14. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  15. if (body == null)
  16. return;
  17. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  18. int tagId = (Integer) map.get( "tagId");
  19. if (tagId % 2 == 0) {
  20. //处理消息
  21. System.out.println( "EvenConsumer接收并处理消息:"+tagId);
  22. //通知服务器此消息已经被处理了
  23. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  24. } else{
  25. //通知服务器消息处理失败,重新放回队列。false表示处理失败消息不放会队列,直接删除
  26. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  27. }
  28. }
  29. }

 


  
  
  
  1. /**
  2. * 专门处理奇数消息的消费者
  3. */
  4. public class OddConsumer extends MessageConsumer {
  5. public OddConsumer(ConnectionUtil connectionUtil) {
  6. super(connectionUtil);
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. this.consumerTag=consumerTag;
  11. System.out.println( "OddConsumer消费者:"+consumerTag+ ",注册成功!");
  12. }
  13. @Override
  14. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  15. if (body == null)
  16. return;
  17. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  18. int tagId = (Integer) map.get( "tagId");
  19. if (tagId % 2 != 0) {
  20. //处理消息
  21. System.out.println( "OddConsumer接收并处理消息:"+tagId);
  22. //通知服务器此消息已经被处理了
  23. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  24. } else{
  25. //通知服务器消息处理失败,重新放回队列。false表示处理失败消息不放会队列,直接删除
  26. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  27. }
  28. }
  29. }


5.5、监听器


  
  
  
  1. /**
  2. *producer发送确认事件。
  3. */
  4. public class MyConfirmListener implements ConfirmListener{
  5. /**
  6. * 生产者发送消息到exchange成功的回调方法
  7. * 消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。但是可以设置ReturnListener监听来监听有没有匹配的队列。
  8. * 因此handleAck执行了,并不能完全表示消息已经进入了对应的队列,只能表示对应的exchange成功的接收了消息。
  9. * 消息被exchange接收过后,还需要通过一定的匹配规则分发到对应的队列queue中。
  10. */
  11. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  12. //注意:deliveryTag是broker给消息指定的唯一id(从1开始)
  13. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)成功!multiple="+multiple);
  14. }
  15. /**
  16. * 生产者发送消息到服务器broker失败的回调方法,服务器丢失了此消息。
  17. * 注意,丢失的消息仍然可以传递给消费者,但broker不能保证这一点。
  18. */
  19. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  20. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)失败!服务器broker丢失了消息");
  21. }
  22. }



  
  
  
  1. /**
  2. * 实现此接口以通知交付basicpublish失败时,“mandatory”或“immediate”的标志监听(源代码注释翻译)。
  3. * 在发布消息时设置mandatory等于true,监听消息是否有相匹配的队列,
  4. * 没有时ReturnListener将执行handleReturn方法,消息将返给发送者 。
  5. * 由于3.0版本过后取消了支持immediate,此处不做过多的解释。
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  9. BasicProperties properties, byte[] body) throws IOException {
  10. System.out.println( "消息发送到队列失败:回复失败编码:"+replyCode+ ";回复失败文本:"+replyText+ ";失败消息对象:"+SerializationUtils.deserialize(body));
  11. }
  12. }


5.6、客户端


  
  
  
  1. public class Client {
  2. public static void main(String[] args) {
  3. new Client();
  4. }
  5. public Client(){
  6. try {
  7. //发消息
  8. publishMessage();
  9. //注册消费者
  10. addConsumer();
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. public void publishMessage() throws IOException, InterruptedException{
  18. ConnectionUtil connectionUtil= new ConnectionUtil( "testqueue");
  19. MessageProducer producer= new MessageProducer(connectionUtil);
  20. connectionUtil.channel.confirmSelect();
  21. //注意:返回的时候Return在前,Confirm在后
  22. connectionUtil.channel.addConfirmListener( new MyConfirmListener());
  23. connectionUtil.channel.addReturnListener( new MyReturnListener());
  24. int i= 1;
  25. while (i<= 10) {
  26. HashMap<String, Object> map= new HashMap<String, Object>();
  27. map.put( "tagId", i);
  28. producer.sendMessage(map);
  29. i++;
  30. }
  31. }
  32. public void addConsumer() throws IOException{
  33. ConnectionUtil connectionUtil= new ConnectionUtil( "testqueue");
  34. OddConsumer odd= new OddConsumer(connectionUtil);
  35. odd.basicConsume();
  36. EvenConsumer even= new EvenConsumer(connectionUtil);
  37. even.basicConsume();
  38. }
  39. }

 

5.7、测试结果


  
  
  
  1. MessageProducer发送了一条消息:{tagId= 1}
  2. MessageProducer发送了一条消息:{tagId= 2}
  3. MessageProducer发送了一条消息:{tagId= 3}
  4. Exchange接收消息: 1(deliveryTag)成功!multiple= false
  5. Exchange接收消息: 2(deliveryTag)成功!multiple= false
  6. MessageProducer发送了一条消息:{tagId= 4}
  7. Exchange接收消息: 3(deliveryTag)成功!multiple= false
  8. MessageProducer发送了一条消息:{tagId= 5}
  9. Exchange接收消息: 4(deliveryTag)成功!multiple= false
  10. MessageProducer发送了一条消息:{tagId= 6}
  11. Exchange接收消息: 5(deliveryTag)成功!multiple= false
  12. MessageProducer发送了一条消息:{tagId= 7}
  13. Exchange接收消息: 6(deliveryTag)成功!multiple= false
  14. MessageProducer发送了一条消息:{tagId= 8}
  15. Exchange接收消息: 7(deliveryTag)成功!multiple= false
  16. Exchange接收消息: 8(deliveryTag)成功!multiple= false
  17. MessageProducer发送了一条消息:{tagId= 9}
  18. Exchange接收消息: 9(deliveryTag)成功!multiple= false
  19. MessageProducer发送了一条消息:{tagId= 10}
  20. Exchange接收消息: 10(deliveryTag)成功!multiple= false
  21. OddConsumer消费者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,注册成功!
  22. OddConsumer接收并处理消息: 1
  23. OddConsumer接收并处理消息: 3
  24. OddConsumer接收并处理消息: 5
  25. OddConsumer接收并处理消息: 7
  26. OddConsumer接收并处理消息: 9
  27. EvenConsumer消费者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,注册成功!
  28. EvenConsumer接收并处理消息: 4
  29. EvenConsumer接收并处理消息: 8
  30. EvenConsumer接收并处理消息: 2
  31. EvenConsumer接收并处理消息: 10
  32. EvenConsumer接收并处理消息: 6

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM