springboot中使用RabbitMq消息队列使用笔记(使用配置版本)


rabbitmq是什么,怎么搭建我在这就不叙述了,这里只说怎么使用。

说个小插曲,如果有的人连接rabbitmq非常慢,经常超时或者启动rabbitmq很慢,可以在linux环境下配置一下就行了:

第一步:打开linux输入以下命令

[root@hadoop1 mq]# hostname
hadoop1

这里的hadoop1就是我的主机名,然后把这个主机名配置进hosts文件即可;

第二步:vim /etc/hosts 命令对hosts文件进行添加一行数据:

主机名hadoop1为例
只需在hosts文件中加入 127.0.0.1 hadoop1
然后wq保存退出即可。

 第三步:重启rabbitmq应该就可以解决连接,启动过慢的问题了;

=====================================================

以下是使用rabbitmq的过程。

给大家推荐https://blog.csdn.net/qq_35387940/article/details/100514134这个博客,我的案例也是参照这个写的,修改部分代码加上自己的心得体会,有些原作者没有的注释我都加上了。可以先去看原作者的,然后再看这个。

 

 

 一个交换机可以连接多个队列,生产者发送信息给交换机,交换机根据生产者带来的路由关键字来确认传递给哪个队列,消费者可以监听一个或多个队列,当队列里存在消息的时候就会消费。

第一步:添加相关依赖

<dependency>
 <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

第二步:编写application.yml主配置文件,有的配置看情况而定

server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:开启失败重试 enabled: true #第一次重试的间隔时长 initial-interval: 10000ms #最长重试间隔,超过这个间隔将不再重试 max-interval: 300000ms #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍 multiplier: 2

第三步:编写配置类

@Configuration public class DirectConfig { /** * 配置一个队列,可以配置多个
   *第一个参数是队列的名称 *第二个参数durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,但并不是消息的持久化 *当消息代理重启时仍然存在,暂存队列:当前连接有效 *第三个参数exclusive(独有的):默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable *第四个参数autoDelete:是否自动删除,默认是false,当没有生产者或者消费者使用此队列,该队列会自动删除。 *return new Queue("TestDirectQueue",true,true,false); *一般设置一下队列的持久化就好,其余两个就是默认false
*/ @Bean public Queue myQueue1 () { return new Queue("directQueue111",true); }

  @Bean
    public Queue myQueue2 () {
        return new Queue("directQueue222",true);
    }
 
 
  /** * 给交换机起名字,也可以配置多个交换机
   *第一个参数是交换机的名字,
   *第二个参数durable:是否持久化,默认false,持久化交换机
   *第三个参数autoDelete,默认为false,同队列解释
  
    如果你是topic模式的,那么交换机类型就是TopicExchange,如果是广播(fanout)类型的就是FanoutExchange
   */
 @Bean public DirectExchange myDirectExchange() { return new DirectExchange("directExchange",true,false); } @Bean public DirectExchange lonelyExchange() { return new DirectExchange("lonelyExchange",true,false); } /*绑定 将队列和交换机绑定, 并设置用于匹配键:mykey 队列在前,交换机在后
  1.如果是fanout的模式的话,就没有后面的with路由关键字
  2.如果是topic模式的话是支持通配符的

    通配符规则:
​      `#`:匹配一个或多个词
​      `*`:匹配不多不少恰好1个词
    举例:
​      `audit.#`:能够匹配`audit.irs.corporate`队列 或者 `audit.irs`队列
​      `audit.*`:只能匹配`audit.irs`队列

    */

 @Bean public Binding directBinding(){ return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("mykey");
    //return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("audit.#"); } }

第四步:写个接口方法进行测试

   @Autowired
   private AmqpTemplate amqpTemplate;
   @GetMapping("/sendDirectMessage") @ResponseBody public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //将消息携带绑定键值:路由关键字mykey 发送到交换机 directExchange
        amqpTemplate.convertAndSend("directExchange", "mykey", map); return "ok"; }

红框圈起来的是比较常用的3个方法,分别是:

- 指定交换机、RoutingKey和消息体
- 指定消息
- 指定RoutingKey和消息,会向默认的交换机发送消息

到此,生产者已经编写完成,一般来说我们生产者和消费者在不同的模块里,我也是采用这种模式,编写两个spring boot模块。

 

第五步:编写消费者,配置跟生产者基本一致。

依赖包不变,编写application.yml,注意端口不能冲突

server: port: 9999 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl

第六步:编写消费者,也就是一个监听器,监听自己队列是否有数据。

@Component//必须要写 //@RabbitListener(queues = {"directQueue111","directQueue222"})
public class DirectListener { /** * RabbitListener 注解放在类上就需要使用RabbitHandler注解放在方法上进行配合使用, * 根据传递过来的参数类型判断那个方法执行,如果传递过来的是String类型的,就去找带有@RabbitHandler注解的方法,参数是String的
   * 如果传递过来的是map类型的,就找参数是map的方法
   *@RabbitListener也可以直接放在方法上监听队列,如果该队列存在消息,就由这个方法来接收处理
*/ //@RabbitHandler @RabbitListener(queues = {"directQueue111"}) public void process(Map map) { System.out.println("DirectReceiver111消费者收到消息 : " + map.toString()); }
  @RabbitListener(queues = {"directQueue222"})
  public void process(Map map) {
  System.out.println("DirectReceiver222消费者收到消息 : " + map.toString());
  }
}

一般情况下消息队列的使用已经介绍完毕了,但是有的小伙伴就会问,我怎么知道我的消息是发出去了没有?消费者收到了没有?好的,这就涉及到了生产者的确认机制和消费者的确认机制。

我们先来说生产者的确认机制在生产者application.yml中添加消息确认配置

server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:开启失败重试 enabled: true #第一次重试的间隔时长 initial-interval: 10000ms #最长重试间隔,超过这个间隔将不再重试 max-interval: 300000ms #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍 multiplier: 2 #确认消息已发送到交换机(Exchange) publisher-confirms: true #确认消息已发送到队列(Queue) publisher-returns: true

 在生产者方编写一个配置文件RabbitConfig:

@Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //Mandatory强制的 //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData); System.out.println("ConfirmCallback:     " + "确认情况:" + b); System.out.println("ConfirmCallback:     " + "原因:" + s); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback:     " + "消息:" + message); System.out.println("ReturnCallback:     " + "回应码:" + replyCode); System.out.println("ReturnCallback:     " + "回应信息:" + replyText); System.out.println("ReturnCallback:     " + "交换机:" + exchange); System.out.println("ReturnCallback:     " + "路由键:" + routingKey); } }); return rabbitTemplate; } }

生产者消息确认无非就四种情况:先把结论也写了出来

①消息推送到server,但是在server里找不到交换机 --》 触发confirm回调失败
②消息推送到server,找到交换机了,但是没找到队列 --》 先触发confirm回调返回true,然后再触发return返回失败
③消息推送到sever,交换机和队列啥都没找到 --》 触发confirm回调失败
④消息推送成功

①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)
 @GetMapping("/TestMessageAck") public String TestMessageAck() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: non-existent-exchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map); return "ok"; }

调用接口,查看rabbitmq-provuder项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40) ConfirmCallback: 相关数据:null ConfirmCallback: 确认情况:false ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)

结论: ①这种情况触发的是 ConfirmCallback 回调函数。

 

②消息推送到server,找到交换机了,但是没找到队列  
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

@Bean DirectExchange lonelyDirectExchange() {   return new DirectExchange("lonelyDirectExchange"); }

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/TestMessageAck2") public String TestMessageAck2() {   String messageId = String.valueOf(UUID.randomUUID());   String messageData = "message: lonelyDirectExchange test message ";   String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));   Map<String, Object> map = new HashMap<>();   map.put("messageId", messageId);   map.put("messageData", messageData);   map.put("createTime", createTime);   rabbitTemplate.convertAndSend("lonelyDirectExchange", "DirectRouting", map);   return "ok"; }

调用接口,查看rabbitmq-provuder项目的控制台输出情况:

ReturnCallback: 消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ReturnCallback: 回应码:312 ReturnCallback: 回应信息:NO_ROUTE ReturnCallback: 交换机:lonelyDirectExchange ReturnCallback: 路由键:TestDirectRouting ConfirmCallback: 相关数据:null ConfirmCallback: 确认情况:true ConfirmCallback: 原因:null

可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
  结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

 

③消息推送到sever,交换机和队列啥都没找到 
这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。
  结论: ③这种情况触发的是 ConfirmCallback 回调函数。

 

 ④消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage接口,可以看到控制台输出:

ConfirmCallback: 相关数据:null ConfirmCallback: 确认情况:true ConfirmCallback: 原因:null

结论: ④这种情况触发的是 ConfirmCallback 回调函数。

以上是生产者推送消息的消息确认 回调函数的使用介绍(可以在回调函数根据需求做对应的扩展或者业务数据处理)。

下面我们继续说一说消费者的消息确认机制:
把消费者自动确认消息改为手动确认:
server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:开启失败重试 enabled: true #第一次重试的间隔时长 initial-interval: 10000ms #最长重试间隔,超过这个间隔将不再重试 max-interval: 300000ms #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍 multiplier: 2
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true
# 手动确认消息(默认是自动确认): listener: simple: acknowledge-mode: manual
 
还是那个在消费者方编写监听器:
@Component public class DirectListener {
/**
* 手动确认机制
*/
@RabbitListener(queues = {"directQueue111"})
public void userInsert(Map map, Channel channel, Message message) throws IOException {

try {
     //Message中是rabbitmq管道中的所有信息,包括了消息体,队列名称,交换机名称,唯一标志,状态码等基本信息
System.out.println(message);
     
    //这个map就是到接收的消息

System.out.println(map.toString());

    //同一时刻服务器只会发一条消息给消费者(能者多劳模式)

//channel.basicQos(1);

/**
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
//确认签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//签收失败
/**
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
* requeue:被拒绝的是否重新入队列
*/
//如果重新放进队列中还是会放在队列头部,继续消费者消费,如果一直消费一直错误就会产生堆积问题,理性使用
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列


channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接。
如果rabbitmq断开,连接后会自动重新推送。如果消费端应用重启,消息会自动重新推送。

东西呢就是这么个东西,大部分都是跟另外一篇博客一致的,只有消费者消息确认这一块差异多点。over


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM