屬性文件:org.springframework.boot.autoconfigure.amqp.RabbitProperties
★Config:
# base
spring.rabbitmq.host: 服務Host
spring.rabbitmq.port: 服務端口
spring.rabbitmq.username: 用戶名
spring.rabbitmq.password: 密碼
spring.rabbitmq.virtual-host: RabbitMQ的虛擬主機
spring.rabbitmq.addresses: 指定client連接到的server的地址,多個以逗號分隔(優先取addresses,然后再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超時,單位秒,0為不指定;默認60s
spring.rabbitmq.publisher-confirms: 開啟Publisher Confirm機制
spring.rabbitmq.publisher-returns: 開啟publisher Return機制
spring.rabbitmq.connection-timeout: 連接超時,單位毫秒,0表示無窮大,不超時
spring.rabbitmq.parsed-addresses:
# ssl
spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路徑
spring.rabbitmq.ssl.key-store-password: 指定訪問key store的密碼
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定訪問trust store的密碼
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
# cache
spring.rabbitmq.cache.channel.size: 緩存中保持的channel數量
spring.rabbitmq.cache.channel.checkout-timeout: 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;如果為0,則總是創建一個新channel
channel不是線程安全的,線程並發的去訪問同一個channel會出問題。這里有幾種處理方式: 1 全局公用一個channel且使用全局鎖,讓操作channel排隊.這種明顯性能是不行的; 2 一個線程對應創建一個新的channel,但是要處理好一個連接能支撐的最大channel數量; 3 一個線程對應一個channel,但是是從channel池子拿的,不是每次都創建新的.一旦一個線程完成了一個channel的使用,它將返回到池中,從而使該channel可用於另一個線程。 量不大的話,使用第二種方式就可以了。量大的話,建議使用第三種方式,畢竟創建和銷毀channel也是耗時耗資源的.在spring amqp中,提供了一個緩存channel的方案。可以在創建CachingConnectionFactory時指定緩存的模式。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); connectionFactory.setChannelCacheSize(25);
上面的兩行代碼,表示channel共用唯一的一個連接,且緩存了25個channel,注意這里的25個並不是說,這個連接里只能最多創建25個channel,而是說最多緩存25個channel。
舉個例子,假設並發發送100條消息,在CachingConnectionFactory.CacheMode.CHANNEL模式下,瞬間會創建100個channel的,然后往緩存里放25個channel,當流量下去了,剛剛創建的多余的channel會自動關閉掉的,緩存里只保留25個。 使用這種方式的話,要注意緩存的channel數量,不能太小,不然流量一大,仍然會造成頻繁關閉channel的情況。當然我們也不能說有多少並發,就創建多少個channel,還是要限制一下,這個時候可以使用:
connectionFactory.setChannelCheckoutTimeout(1000);
當ChannelCheckoutTimeout的值大於0的時候,ChannelCacheSize的值就是最大的channel數量了,一旦從緩存中獲取不到channel,等待ChannelCheckoutTimeout毫秒后,如果還是獲取不到的,就會拋AmqpTimeoutException了。 鏈接:https://juejin.cn/post/6844904015508029453
spring.rabbitmq.cache.connection.size: 緩存的連接數,只有是CONNECTION模式時生效
spring.rabbitmq.cache.connection.mode: 連接工廠緩存模式:CHANNEL 和 CONNECTION
# listener
spring.rabbitmq.listener.simple.auto-startup: 是否啟動時自動啟動容器
使用情景1: rabbitMQ配置動態啟動,rabbitMQ代理不正常時可以不啟動項目中MQ的監聽,主要解決,項目和MQ的啟動順序的問題。 使用情景2: 當啟動程序時,並且MQ隊列中有消息,這些消息的消費就會在應用程序完全啟動之前開始,但是啟動程序要初始化一些程序要用的數據,在沒有初始化完成時消費這些消息會報錯,所以希望在程序初始化完成后,再啟動RabbitMQ的監聽容器。 可以通過控制層,測試去啟動,生成環境是通過一個線程,一個去查看rabbit的代理服務器是否正常,正常的時候就開啟rabbit的監聽。 package com.example.demo.main; import java.util.Set; import javax.annotation.Resource; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("rabbitmq/listener") public class RabbitMQController { @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("stop") public String stop(){ rabbitListenerEndpointRegistry.stop(); System.out.println("停止MQ監聽"); return "success"; } @RequestMapping("start") public String start(){ rabbitListenerEndpointRegistry.start(); System.out.println("啟動MQ監聽"); return "success"; } @RequestMapping("setup") public String setup(int consumer, int maxConsumer){ Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds(); SimpleMessageListenerContainer container = null; for(String id : containerIds){ container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id); if(container != null){ container.setConcurrentConsumers(consumer); container.setMaxConcurrentConsumers(maxConsumer); } } System.out.println("設置"); return "success"; } }
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
spring.rabbitmq.listener.simple.concurrency: 最小的消費者數量
參考: https://www.cnblogs.com/liukunjava/p/13163951.html
spring.rabbitmq.listener.simple.max-concurrency: 最大的消費者數量
spring.rabbitmq.listener.simple.prefetch: 指定一個請求能處理多少個消息,如果有事務的話,必須大於等於transaction數量.
每個customer會在MQ預取一些消息放入內存的LinkedBlockingQueue中,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。從2.0開始默認為250;設置為1將還原為以前的行為。 prefetch默認值以前是1,這可能會導致高效使用者的利用率不足。從spring-amqp 2.0版開始,默認的prefetch值是250,這將使消費者在大多數常見場景中保持忙碌,從而提高吞吐量。 不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內存中大量堆積,消耗大量內存;以及對於一些嚴格要求順序的消息,prefetch的值應當設置為1。 對於低容量消息和多個消費者的情況(也包括單listener容器的concurrency配置)希望在多個使用者之間實現更均勻的消息分布,建議在手動ack下並設置prefetch=1。
spring.rabbitmq.listener.simple.transaction-size: 指定一個事務處理的消息數量,最好是小於等於prefetch的數量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 決定被拒絕的消息是否重新入隊;默認是true(與參數acknowledge-mode有關系)
spring.rabbitmq.listener.simple.idle-event-interval: 多少長時間發布空閑容器時間,單位毫秒
spring.rabbitmq.listener.simple.retry.enabled: 監聽重試是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重試次數
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次嘗試發布或傳遞消息之間的間隔
spring.rabbitmq.listener.simple.retry.multiplier: 應用於上一重試間隔的乘數
spring.rabbitmq.listener.simple.retry.max-interval: 最大重試時間間隔
spring.rabbitmq.listener.simple.retry.stateless: 重試是有狀態or無狀態
# template
spring.rabbitmq.template.mandatory: 啟用強制信息;默認false
spring.rabbitmq.template.receive-timeout: receive() 操作的超時時間
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超時時間
spring.rabbitmq.template.retry.enabled: 發送重試是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重試次數
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次嘗試發布或傳遞消息之間的間隔
spring.rabbitmq.template.retry.multiplier: 應用於上一重試間隔的乘數
spring.rabbitmq.template.retry.max-interval: 最大重試時間間隔