基本概念
channel:消息通道
exchage:消息路由規則,四種模式(topic、direct、fanout、header)
direct:默認,根據routingKey完全匹配,好處是先匹配再發送
topic:根據綁定關鍵字通配符規則匹配、比較靈活
fanout:不需要指定routingkey,相當於群發
header:不太常用,可以自定義匹配規則
queue:消息存儲
routerKey:消息路由關鍵字(發送的時候成為bindingkey,接收成為routingKey)
隊列的概念(生產者消費者啟動報錯大多數都是這幾個不匹配導致)
durable:持久化到硬盤
exclusive:唯一性
autoDelete:自動刪除
/** * 第二個參數:queue的持久化是通過durable=true來實現的。 * 第三個參數:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;
2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。 * 第四個參數:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。 * @param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自動刪除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }
@Configuration public class RabbitConfig { // 發送消息的格式轉換器 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } // 接收消息的格式轉換器 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } // 信道配置,此地使用direct模式 @Bean public DirectExchange defaultExchange() { return new DirectExchange(MQConstant.EXCHANGE, true, false); } // 配置隊列規則屬性 例如保活時間 持久化 是否自動刪除等 @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自動刪除 Queue queue = new Queue(MQConstant.QUEUE_NAME, true, false, true, arguments); return queue; } // 綁定隊列和exchange @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.QUEUE_NAME); } } 如何發送 @Autowired private RabbitTemplate template; //convertAndSend和send的區別是這個convert更方便使用,可以傳object進去 template.convertAndSend(MQConstant.EXCHANGE, bindingKey, msg); //如何接收,注意隊列名稱、exchange名稱、routingKey的指定。 //注意:隊列的消息只要被一個消費者匹配消費后就不存在了 @Component @RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "default_queue", durable = "true"), exchange = @Exchange(value = "default_exchange", type = ExchangeTypes.TOPIC), key = "meeting")) @Log4j public class RabbitMqListener { @RabbitHandler public void processMessage(MqMsg message) { log.error( message); } }
交換機持久化設置: exchange 持久化,在聲明時指定 durable為true
隊列持久化設置:queue 持久化,在聲明時指定 durable 為true
Rabbitmq常用配置
spring.rabbitmq.addresses指定client連接到的server的地址,多個以逗號分隔. spring.rabbitmq.dynamic是否創建AmqpAdmin bean. 默認為: true) spring.rabbitmq.host指定RabbitMQ host.默認為: localhost) spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式. spring.rabbitmq.listener.auto-startup是否在啟動時就啟動mq,默認: true) spring.rabbitmq.listener.concurrency指定最小的消費者數量. spring.rabbitmq.listener.max-concurrency指定最大的消費者數量. spring.rabbitmq.listener.prefetch指定一個請求能處理多少個消息,如果有事務的話,必須大於等於transaction數量. spring.rabbitmq.listener.transaction-size指定一個事務處理的消息數量,最好是小於等於prefetch的數量. spring.rabbitmq.password指定broker的密碼. spring.rabbitmq.port指定RabbitMQ 的端口,默認: 5672) spring.rabbitmq.requested-heartbeat指定心跳超時,0為不指定. spring.rabbitmq.ssl.enabled是否開始SSL,默認: false) spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路徑 spring.rabbitmq.ssl.key-store-password指定訪問key store的密碼. spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store. spring.rabbitmq.ssl.trust-store-password指定訪問trust store的密碼. spring.rabbitmq.username指定登陸broker的用戶名. spring.rabbitmq.virtual-host指定連接到broker的Virtual host. spring.rabbitmq.publisher-confirms=true 開啟發送確認 spring.rabbitmq.publisher-returns=true 開啟發送失敗退回