本系列是學習SpringBoot整合RabbitMQ的練手,包含服務安裝,RabbitMQ整合SpringBoot2.x,消息可靠性投遞實現等三篇博客。
學習路徑:https://www.imooc.com/learn/1042 RabbitMQ消息中間件極速入門與實戰
項目源碼:https://github.com/ZbLeaning/Boot-RabbitMQ
整合實際上主要兩步:
1、引入相關依賴
2、對application.yml進行配置
注意:后續需要使用數據庫,因此需要安裝mysql。https://blog.csdn.net/qq_37719778/article/details/81298292 mysql安裝教程
Spring.RabbitMQ配置的含義可參考:https://blog.csdn.net/en_joker/article/details/80103519
數據庫建表語句:
DROP TABLE IF EXISTS `broker_message_log`; CREATE TABLE `broker_message_log` ( `message_id` varchar(255) NOT NULL COMMENT '消息唯一ID', `message` varchar(4000) NOT NULL COMMENT '消息內容', `try_count` int(4) DEFAULT '0' COMMENT '重試次數', `status` varchar(10) DEFAULT '' COMMENT '消息投遞狀態 0投遞中,1投遞成功,2投遞失敗', `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重試時間', `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for t_order -- ---------------------------- DROP TABLE IF EXISTS `t_order`; CREATE TABLE `t_order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `message_id` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;
完成前期准備后開始進行整合。
Producer:服務端
1、新建一個SpringBoot項目,項目結構如下
2、添加Pom.xml文件依賴
<!--rabbitmq依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-boot-starter</artifactId> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.29</version> </dependency> <dependency> <groupId>com.github.miemiedev</groupId> <artifactId>mybatis-paginator</artifactId> <version>1.2.17</version> <exclusions> <exclusion> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> </exclusion> </exclusions> </dependency> <!--工具類依賴包--> <dependency> <groupId>org.apache-commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
3、配置application.yml文件
spring: rabbitmq: addresses: 134.175.33.221:5672 username: guest password: guest virtual-host: / ##開啟Publisher Confirm機制 publisher-confirms: true ##開啟Publisher Return機制 publisher-returns: true template: mandatory: true datasource: url: jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull username: root password: binzhang driverClassName: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource server: port: 8001 servlet: context-path: / mybatis: mapper-locations: classpath:mapping/*.xml logging: level: tk: mybatis: trace
4、編寫消息發送類,直接使用SpringBoot配置的RabbitTemplate模板
import com.imooc.mq.entity.Order; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Title: OrderSender * @Description: 訂單發送 * @date 2019/1/2210:20 */ @Component public class OrderSender { //使用rabbitmq模板 @Autowired private RabbitTemplate rabbitTemplate; //發送消息 public void sendOrder(Order order) throws Exception{ CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange",//exchange "order.abcd",//routingKey order,//消息體內容 correlationData); //消息唯一id } }
5、通過控制面板手動建立交換機exchange、消息隊列queue
6、點擊進入創建好的order-exchange,設置綁定路由鍵
7、寫測試demo,運行消息發現,看控制台是否收到消息
@RunWith(SpringRunner.class) @SpringBootTest public class MqApplicationTests { @Autowired private OrderSender orderSender; @Test public void contextLoads() { Order order = new Order(); order.setId("aaa"); order.setName("測試消息a"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString()); try { orderSender.sendOrder(order); } catch (Exception e) { e.printStackTrace(); } } }
接收消息成功:
注意:啟動時如果報異常
Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
則需要配置
@MapperScan("com.imooc.mq.mapper") @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
Consumer:消費端
1、項目結構
2、配置Pom.xml文件,引入依賴,可直接復制服務端pom.xml注入的依賴
3、配置Application.xml文件
## springboot整合rabbitmq的基本配置 spring: rabbitmq: addresses: 134.175.33.221:5672 username: guest password: guest ##連接到RabbitMQ的虛擬主機 virtual-host: / ## 消費端配置 listener: simple: ##消費者的最小數量 concurrency: 5 ## manual:手動 ack(確認) acknowledge-mode: manual ##消費者的最大數量 max-concurrency: 10 ##在單個請求中處理的消息個數,應該大於等於事務數量 prefetch: 1 datasource: url: jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull username: root password: binzhang driverClassName: com.mysql.jdbc.Driver server: port: 8002 servlet: context-path: /
4、需要將服務端的Order類也復制到消費端
5、編寫消息接收類
import com.imooc.mq.entity.Order; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** * @Title: OrderReceiver * @Description: 消費 * @date 2019/1/2211:03 */ @Component public class OrderReceiver { /** * @RabbitListener 消息監聽,可配置交換機、隊列、路由key * 該注解會創建隊列和交互機 並建立綁定關系 * @RabbitHandler 標識此方法如果有消息過來,消費者要調用這個方法 * @Payload 消息體 * @Headers 消息頭 * @param order */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",declare = "true"), exchange = @Exchange(name = "order-exchange",declare = "true",type = "topic"), key = "order.abcd" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws Exception{ //消費者操作 System.out.println("------收到消息,開始消費------"); System.out.println("訂單ID:"+order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //現在是手動確認消息 ACK channel.basicAck(deliveryTag,false); } }
6、運行成功后
基本的服務和消費端整合及演示demo已完成,一般開發過程中我們大都采用手動確認消息機制,如果注釋掉該行則會出現消息被消費但是一直處於未被確認的狀態。當重啟服務端再次發現消息時,消息也會被消費。