RabbitMQ基礎教程之Spring&JavaConfig使用篇


RabbitMQ基礎教程之Spring使用篇

相關博文,推薦查看:

  1. RabbitMq基礎教程之安裝與測試
  2. RabbitMq基礎教程之基本概念
  3. RabbitMQ基礎教程之基本使用篇
  4. RabbitMQ基礎教程之使用進階篇

在實際的應用場景中,將RabbitMQ和Spring結合起來使用的時候可能更加頻繁,網上關於Spring結合的博文中,大多都是xml的方式,這篇博文,則主要介紹下利用JavaConfig的結合,又會是怎樣的

I. Spring中RabbitMQ的基本使用姿勢

1. 准備

開始之前,首先添加上必要的依賴,主要利用 spring-rabbit 來實現,這個依賴中,內部又依賴的Spring相關的模塊,下面統一改成5.0.4版本

<dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> 

流程分析

實現主要分為兩塊,一個是投遞服務,一個是消費服務,結合前面RabbitMQ的基本使用姿勢中的流程,即便是使用Spring,我們也避免不了下面幾步

  • 建立連接
  • 聲明Exchange ,聲明Queue
  • 建立Queue和Exchange之間的綁定關系
  • 發送消息
  • 消費消息(ack/nak)

2. 基本case

首先借助Spring,來實現一個最基本的最簡單的實現方式

/** * Created by yihui in 19:53 18/5/30. */ public class SimpleProducer { public static void main(String[] args) throws InterruptedException { CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); RabbitAdmin admin = new RabbitAdmin(factory); // 創建隊列 Queue queue = new Queue("hello", true, false, false, null); admin.declareQueue(queue); //創建topic類型的交換機 TopicExchange exchange = new TopicExchange("topic.exchange"); admin.declareExchange(exchange); //交換機和隊列綁定,路由規則為匹配"foo."開頭的路由鍵 admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*")); //設置監聽 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory); Object listener = new Object() { public void handleMessage(String foo) { System.out.println(" [x] Received '" + foo + "'"); } }; MessageListenerAdapter adapter = new MessageListenerAdapter(listener); container.setMessageListener(adapter); container.setQueues(queue); container.start(); //發送消息 RabbitTemplate template = new RabbitTemplate(factory); template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!"); Thread.sleep(1000); // 關閉 container.stop(); } } 

3. 邏輯分析

上面這一段代碼中,包含了消息投遞和消費兩塊,從實現而言,基本上邏輯和前面的基礎使用沒有什么太大的區別,步驟如下:

  1. 建立連接: new CachingConnectionFactory("127.0.0.1", 5672)
  2. 聲明Queue: new Queue("hello", true, false, false, null)
  3. 聲明Exchange: new TopicExchange("topic.exchange")
  4. 綁定Queue和Exchange: admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
  5. 投遞消息: template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
  6. 消費消息: 設置MessageListenerAdapter

這里面有幾個類需要額外注意:

  • RabbitTemplate: Spring實現的發送消息的模板,可以直接發送消息
  • SimpleMessageListenerContainer: 注冊接收消息的容器

II. Spring結合JavaConfig使用RabbitMQ使用姿勢

1. 公共配置

主要是將公共的ConnectionFactory 和 RabbitAdmin 抽取出來

@Configuration @ComponentScan("com.git.hui.rabbit.spring") public class SpringConfig { private Environment environment; @Autowired public void setEnvironment(Environment environment) { this.environment = environment; System.out.println("then env: " + environment); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } 

2. 消息投遞

發送消息的組件就比較簡單了,直接利用 AmqpTemplate 即可

@Component public class AmqpProducer { private AmqpTemplate amqpTemplate; @Autowired public void amqpTemplate(ConnectionFactory connectionFactory) { amqpTemplate = new RabbitTemplate(connectionFactory); } /** * 將消息發送到指定的交換器上 * * @param exchange * @param msg */ public void publishMsg(String exchange, String routingKey, Object msg) { amqpTemplate.convertAndSend(exchange, routingKey, msg); } } 

3. DirectExchange消息消費

根據不同的Exchange類型,分別實現如下

DirectExchange方式

@Configuration public class DirectConsumerConfig { @Autowired private ConnectionFactory connectionFactory; @Autowired private RabbitAdmin rabbitAdmin; @Bean public DirectExchange directExchange() { DirectExchange directExchange = new DirectExchange("direct.exchange"); directExchange.setAdminsThatShouldDeclare(rabbitAdmin); return directExchange; } @Bean public Queue directQueue() { Queue queue = new Queue("aaa"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding directQueueBinding() { Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public ChannelAwareMessageListener directConsumer() { return new BasicConsumer("direct"); } @Bean(name = "directMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(directQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(directConsumer()); return container; } } 

從上面的實現,基本上都是重新定義了一個Queue, Exchange, Binding, MessageListenerContainer(用來監聽消息),並將消息的消費抽出了一個公共類

@Slf4j public class BasicConsumer implements ChannelAwareMessageListener { private String name; public BasicConsumer(String name) { this.name = name; } @Override public void onMessage(Message message, Channel channel) throws Exception { try { byte[] bytes = message.getBody(); String data = new String(bytes, "utf-8"); System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag()); } catch (Exception e) { log.error("local cache rabbit mq localQueue error! e: {}", e); } } } 

4. 測試

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringConfig.class) public class SprintUnit { @Autowired private AmqpProducer amqpProducer; @Test public void testDirectConsumer() throws InterruptedException { String[] routingKey = new String[]{"hello.world", "world", "test1"}; for (int i = 0; i < 10; i++) { amqpProducer .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i); } System.out.println("-------over---------"); Thread.sleep(1000 * 60 * 10); } } 

這個測試類中,雖然主要是往MQ中投遞消息,但在Spring容器啟動之后,接收MQ消息並消費的實際任務,是通過前面的MessageListenerContainer托付給Spring容器了,上面測試執行之后,輸出為

direct data: >>> hello test1>>> 2 tagId: 1 direct data: >>> hello test1>>> 5 tagId: 2 direct data: >>> hello test1>>> 8 tagId: 3 

5. Topic & Fanout策略

上面的一個寫出來之后,再看這兩個就比較相似了

@Configuration public class TopicConsumerConfig { @Autowired private ConnectionFactory connectionFactory; @Autowired private RabbitAdmin rabbitAdmin; @Bean public TopicExchange topicExchange() { TopicExchange topicExchange = new TopicExchange("topic.exchange"); topicExchange.setAdminsThatShouldDeclare(rabbitAdmin); return topicExchange; } @Bean public Queue topicQueue() { Queue queue = new Queue("bbb"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding topicQueueBinding() { Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public ChannelAwareMessageListener topicConsumer() { return new BasicConsumer("topic"); } @Bean(name = "topicMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(topicQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(topicConsumer()); return container; } } 

對應的測試case

@Test public void testTopicConsumer() throws InterruptedException { String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"}; for (int i = 0; i < 20; i++) { amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i); } System.out.println("-------over---------"); Thread.sleep(1000 * 60 * 10); } 

廣播方式

@Configuration public class FanoutConsumerConfig { @Autowired private ConnectionFactory connectionFactory; @Autowired private RabbitAdmin rabbitAdmin; @Bean public FanoutExchange fanoutExchange() { FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin); return fanoutExchange; } @Bean public Queue fanoutQueue() { Queue queue = new Queue("ccc"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding fanoutQueueBinding() { Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public ChannelAwareMessageListener fanoutConsumer() { return new BasicConsumer("fanout"); } @Bean(name = "FanoutMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(fanoutQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(fanoutConsumer()); return container; } } 

對應的測試case

@Test public void testFanoutConsumer() throws InterruptedException { String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"}; for (int i = 0; i < 20; i++) { amqpProducer .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i); } System.out.println("-------over---------"); Thread.sleep(1000 * 60 * 10); } 

II. 其他

項目地址

一灰灰Blog: https://liuyueyi.github.io/hexblog

一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛

聲明

盡信書則不如,已上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

掃描關注

 
QrCode


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM