1、增加rabbitmq的依賴包
<!-- ampq 依賴包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、application.yaml文件中配置
spring: rabbitmq: host: localhost port: 5672 username: admin password: admin publisher-confirms: true virtual-host: /
3、RabbitMq的工廠連接和模板創建
@Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; //創建工廠連接 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(this.host); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setVirtualHost(this.virtualHost); connectionFactory.setPublisherConfirms(this.publisherConfirms); //必須要設置 return connectionFactory; } //rabbitmq的模板配置 @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必須是prototype類型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(this.connectionFactory()); //template.setConfirmCallback(); 設置消息確認 //template.setReturnCallback(); return template; } }
4、創建交換機、創建隊列、綁定交換機和隊列
@Configuration public class RabbitExchangeConfig { //直連交換機 @Bean public DirectExchange defaultExchange() { return new DirectExchange(RabbitConstant.EXCHANGE_NAME); } //隊列 @Bean public Queue queue() { return QueueBuilder.durable(RabbitConstant.QUEUE_NAME).build(); } //綁定 @Bean public Binding binding() { return BindingBuilder.bind(this.queue()).to(this.defaultExchange()).with(RabbitConstant.ROUTING_KEY); } }
5、消費者
@Component public class RabbitSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容 this.rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_NAME, RabbitConstant.ROUTING_KEY, content, correlationId); } //回調 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回調id:" + correlationData); if (ack) { System.out.println("消息成功消費"); } else { System.out.println("消息消費失敗:" + cause); } } }
6、消費者
@Configuration public class RabbitReceive { @Autowired private RabbitConfig rabbitConfig; @Autowired private RabbitExchangeConfig rabbitExchangeConfig; @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory()); container.setQueues(rabbitExchangeConfig.queue()); //設置要監聽的隊列 container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 } }); return container; } }