文末有源碼地址
添加依賴
<!--amqp依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
創建所需常量類
public class RabbitMQConstant { //簡單模式 public static final String QUEUE_EASY = "easy.queue"; //work模式 public static final String QUEUE_WORK = "work.queue"; //topic模式 public static final String QUEUE_TOPIC_FIRST = "topic.queue.first"; public static final String QUEUE_TOPIC_SECOND = "topic.queue.second"; //發布訂閱模式 public static final String QUEUE_FANOUT = "fanout.queue"; public static final String QUEUE_FANOUT_SECOND = "fanout.queue.second"; //路由key public static final String ROUTING_KEY_EASY = "routing.key.easy"; public static final String ROUTING_KEY_WORK = "routing.key.work"; public static final String ROUTING_KEY_TOPIC_FIRST = "routing.key.topic.first"; public static final String ROUTING_KEY_TOPIC_SECOND = "routing.key.topic.second"; // direct交換機 public static final String EXCHANGE_DIRECT = "direct_exchange"; // work交換機 public static final String EXCHANGE_WORK = "work_exchange"; // topic交換機 public static final String EXCHANGE_TOPIC = "topic_exchange"; // fanout交換機 public static final String EXCHANGE_FANOUT = "fanout_exchange"; }
創建交換機
@Configuration public class ExchangeConfig { /** * 交換機說明: * durable="true" rabbitmq重啟的時候不需要創建新的交換機 * auto-delete 表示交換機沒有在使用時將被自動刪除 默認是false * direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,消息就被投送到相關的隊列 * topic交換器你采用模糊匹配路由鍵的原則進行轉發消息到隊列中 * fanout交換器中沒有路由鍵的概念,他會把消息發送到所有綁定在此交換器上面的隊列中。 */ @Bean(name = RabbitMQConstant.EXCHANGE_DIRECT) public DirectExchange directExchange() { return new DirectExchange(RabbitMQConstant.EXCHANGE_DIRECT, true, false); } @Bean(name = RabbitMQConstant.EXCHANGE_WORK) public DirectExchange workExchange() { return new DirectExchange(RabbitMQConstant.EXCHANGE_WORK, true, false); } @Bean(name = RabbitMQConstant.EXCHANGE_TOPIC) public TopicExchange topicExchange() { return new TopicExchange(RabbitMQConstant.EXCHANGE_TOPIC, true, false); } @Bean(name = RabbitMQConstant.EXCHANGE_FANOUT) public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitMQConstant.EXCHANGE_FANOUT, true, false); } }
創建隊列
@Configuration public class QueueConfig { /** * durable="true" 持久化 rabbitmq重啟的時候不需要創建新的隊列 * exclusive 表示該消息隊列是否只在當前connection生效,默認是false * auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false */ @Bean(name = RabbitMQConstant.QUEUE_EASY) public Queue easyQueue() { return new Queue(RabbitMQConstant.QUEUE_EASY, true, false, false); } @Bean(name = RabbitMQConstant.QUEUE_WORK) public Queue workQueue() { return new Queue(RabbitMQConstant.QUEUE_WORK, true, false, false); } @Bean(name = RabbitMQConstant.QUEUE_TOPIC_FIRST) public Queue topicQueue() { return new Queue(RabbitMQConstant.QUEUE_TOPIC_FIRST, true, false, false); } @Bean(name = RabbitMQConstant.QUEUE_TOPIC_SECOND) public Queue topicQueueSecond() { return new Queue(RabbitMQConstant.QUEUE_TOPIC_SECOND, true, false, false); } @Bean(name = RabbitMQConstant.QUEUE_FANOUT) public Queue fanoutQueue() { return new Queue(RabbitMQConstant.QUEUE_FANOUT, true, false, false); } @Bean(name = RabbitMQConstant.QUEUE_FANOUT_SECOND) public Queue fanoutQueueSecond() { return new Queue(RabbitMQConstant.QUEUE_FANOUT_SECOND, true, false, false); } }
綁定交換機和隊列
@Configuration @Slf4j public class RabbitMqConfig { @Resource private QueueConfig queueConfig; @Resource private ExchangeConfig exchangeConfig; /** * 連接工廠 */ @Resource private ConnectionFactory connectionFactory; /** * 將消息隊列和交換機進行綁定,指定路由 */ @Bean public Binding bindingDirect() { return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY); } @Bean public Binding bindingWork() { return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK); } @Bean public Binding bindingTopic() { return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST); } @Bean public Binding bindingTopicSecond() { return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND); } @Bean public Binding bindingFanout() { return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange()); } @Bean public Binding bindingFanoutSecond() { return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange()); } /** ======================== 定制一些處理策略 =============================*/ /** * 定制化amqp模版 * <p> * Rabbit MQ的消息確認有兩種。 * <p> * 一種是消息發送確認:這種是用來確認生產者將消息發送給交換機,交換機傳遞給隊列過程中,消息是否成功投遞。 * 發送確認分兩步:一是確認是否到達交換機,二是確認是否到達隊列 * <p> * 第二種是消費接收確認:這種是確認消費者是否成功消費了隊列中的消息。 */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); /** * 使用該功能需要開啟消息確認,yml需要配置 publisher-confirms: true * 通過實現ConfirmCallBack接口,用於實現消息發送到交換機Exchange后接收ack回調 * correlationData 消息唯一標志 * ack 確認結果 * cause 失敗原因 */ rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallback()); /** * 使用該功能需要開啟消息返回確認,yml需要配置 publisher-returns: true * 通過實現ReturnCallback接口,如果消息從交換機發送到對應隊列失敗時觸發 * message 消息主體 message * replyCode 消息主體 message * replyText 描述 * exchange 消息使用的交換機 * routingKey 消息使用的路由鍵 */ rabbitTemplate.setReturnCallback(new MsgSendReturnCallback()); return rabbitTemplate; } }
源碼地址:
https://gitee.com/xiaorenwu_dashije/rabbitmq_demo.git
包含Direct模式、Work模式、Fanout模式、Topic模式
