寫在開始
rabbitMq 代碼按照三部分介紹
第一部分 交換機和隊列的創建
第二部分 消息發送
第三部分 消息監聽
第一部分
1 建立queue
2 建立exchange
3 exchange綁定queue
建立之前需要配置兩樣東西
一個是rabbitMq的連接工廠(ConnectionFactory)、另外一個是操作句柄(RabbitAdmin)。可以看到連接工廠是給操作句柄初始化時使用的。
后續創建隊列等一系列操作都需要使用到操作句柄,如果沒有使用的話操作被視為無效。
// 初始化連接 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } // 隊列操作配置 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }
開始建立隊列
public final static String QUEUE_NAME = "tianmh-queue"; public final static String QUEUE_NAME2 = "tianmh-queue2"; @Autowired private RabbitAdmin rabbitAdmin; // 創建隊列1 @Bean(value = QUEUE_NAME) public Queue queue() { Queue queue = new Queue(QUEUE_NAME, true, true, true); this.rabbitAdmin.declareQueue(queue); return queue; } // 創建隊列2 @Bean(value = QUEUE_NAME2) public Queue queue2() { Queue queue = new Queue(QUEUE_NAME2, true, true, true); this.rabbitAdmin.declareQueue(queue); return queue; }
建立交換機。下面例子建立的隊列為廣播類型隊列
// 創建一個 Fanout 類型的交換器 @Bean(value = EXCHANGE_NAME) public Exchange exchange() { Exchange exchange = new FanoutExchange(EXCHANGE_NAME, true, true); this.rabbitAdmin.declareExchange(exchange); return exchange; }
交換機綁定隊列
// 使用路由鍵(routingKey)把隊列(Queue)綁定到交換器(Exchange) @Bean public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) { Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), ROUTING_KEY, null); this.rabbitAdmin.declareBinding(binding); return binding; } // 使用路由鍵(routingKey)把隊列(Queue)綁定到交換器(Exchange) @Bean public Binding binding2(@Qualifier(QUEUE_NAME2) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) { Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), ROUTING_KEY, null); this.rabbitAdmin.declareBinding(binding); return binding; }
第二部分 消息發送
消息發送需指定發送到的exchangeName及routeKey及內容
@Component
public class SenderDemo {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void testSender() {
TestCommand command = new TestCommand();
command.setKey("testContent");
byte[] content = JSONObject.toJSONBytes(command);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setTimestamp(new Date());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(content, messageProperties);
rabbitTemplate.send(RabbitMqConfig.EXCHANGE_NAME, "log", message);
}
}
第三部分 消息監聽
接收消息是通過監聽隊列實現的
@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void process(Message message) { TestCommand command = JSON.parseObject(new String(message.getBody()), TestCommand.class); logger.info("接收處理隊列[{}]的消息[{}]", RabbitMqConfig.QUEUE_NAME, command.toString()); }
就此一個完整的RabbitMqDemo搭建完成
附帶項目POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>