分布式消息中間件
RabbitMQ是用Erlang語言編寫的分布式消息中間件,常常用在大型網站中作為消息隊列來使用,主要目的是各個子系統之間的解耦和異步處理。消息中間件的基本模型是典型的生產者-消費者模型,生產者發送消息到消息隊列,消費者監聽消息隊列,收到消息后消費處理。
在使用RabbitMQ做消息分發時,主要有三個概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解為交換器,RoutingKey可以理解為路由,Queue作為真實存儲消息的隊列和某個Exchange綁定,具體如何路由到感興趣的Queue則由Exchange的三種模式決定:
- fanout
- topic
- direct
Exchange為fanout時,生產者往此Exchange發送的消息會發給每個和其綁定的Queue,此時RoutingKey並不起作用;Exchange為topic時,生產者可以指定一個支持通配符的RoutingKey(如demo.*)發向此Exchange,凡是Exchange上RoutingKey滿足此通配符的Queue就會收到消息;direct類型的Exchange是最直接最簡單的,生產者指定Exchange和RoutingKey,然后往其發送消息,消息只能被綁定的滿足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具體名字,那么默認的名字其實是Queue的名字)
Concurrency與Prefetch
在通常的使用中(Java項目),我們一般會結合spring-amqp框架來使用RabbitMQ,spring-amqp底層調用RabbitMQ的java client來和Broker交互,比如我們會用如下配置來建立RabbitMQ的連接池、聲明Queue以及指明監聽者的監聽行為:
<rabbit:connection-factory id="connectionFactory" /> <!-- template非必須,主要用於生產者發送消息--> <rabbit:template id="template" connection-factory="connectionFactory" /> <rabbit:queue name="remoting.queue" /> <rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3"> <rabbit:listener ref="listener" queue-names="remoting.queue" /> </rabbit:listener-container>
listener-container可以設置消費者在監聽Queue的時候的各種參數,其中concurrency和prefetch是本篇文章比較關心的兩個參數,以下是spring-amqp文檔的解釋:
prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize
concurrentConsumers(concurrency)
The number of concurrent consumers to initially start for each listener.
簡單解釋下就是concurrency設置的是對每個listener在初始化的時候設置的並發消費者的個數,prefetch是每次從一次性從broker里面取的待消費的消息的個數,上面的配置在監控后台看到的效果如下:

圖中可以看出有兩個消費者同時監聽Queue,但是注意這里的消息只有被一個消費者消費掉就會自動ack,另外一個消費者就不會再獲取到此消息,Prefetch Count為配置設置的值3,意味着每個消費者每次會預取3個消息准備消費。每個消費者對應的listener有個Exclusive參數,默認為false, 如果設置為true,concurrency就必須設置為1,即只能單個消費者消費隊列里的消息,適用於必須嚴格執行消息隊列的消費順序(先進先出)。
源碼剖析
這里concurrency的實現方式不看源碼也能猜到,肯定是用多線程的方式來實現的,此時同一進程下打開的本地端口都是56278.下面看看listener-contaner對應的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源碼:
protected int initializeConsumers() { int count = 0; synchronized (this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers); for (int i = 0; i < this.concurrentConsumers; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); count++; } } } return count; }
container啟動的時候會根據設置的concurrency的值(同時不超過最大值)創建n個BlockingQueueConsumer。
protected void doStart() throws Exception { //some code synchronized (this.consumersMonitor) { int newConsumers = initializeConsumers(); //some code Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); for (BlockingQueueConsumer consumer : this.consumers.keySet()) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); this.taskExecutor.execute(processor); } //some code } }
在doStart()方法中調用initializeConsumers來初始化所有的消費者,AsyncMessageProcessingConsumer作為真實的處理器包裝了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其實實現了Runnable接口,由this.taskExecutor.execute(processor)來啟動消費者線程。
private final class AsyncMessageProcessingConsumer implements Runnable { private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException; private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; this.start = new CountDownLatch(1); } //some code @Override public void run() { //some code } }
那么prefetch的值意味着什么呢?其實從名字上大致能看出,BlockingQueueConsumer內部應該維護了一個阻塞隊列BlockingQueue,prefetch應該是這個阻塞隊列的長度,看下BlockingQueueConsumer內部有個queue,這個queue不是對應RabbitMQ的隊列,而是Consumer自己維護的內存級別的隊列,用來暫時存儲從RabbitMQ中取出來的消息:
private final BlockingQueue<Delivery> queue; public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean exclusive, String... queues) { //some code this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); }
BlockingQueueConsumer的構造函數清楚說明了每個消費者內部的隊列大小就是prefetch的大小。
業務問題
前面說過,設置並發的時候,要考慮具體的業務場景,對那種對消息的順序有苛刻要求的場景不適合並發消費,而對於其他場景,比如用戶注冊后給用戶發個提示短信,是不太在意哪個消息先被消費,哪個消息后被消費,因為每個消息是相對獨立的,后注冊的用戶先收到短信也並沒有太大影響。
設置並發消費除了能提高消費的速度,還有另外一個好處:當某個消費者長期阻塞,此時在當前消費者內部的BlockingQueue的消息也會被一直阻塞,但是新來的消息仍然可以投遞給其他消費者消費,這種情況頂多會導致prefetch個數目的消息消費有問題,而不至於單消費者情況下整個RabbitMQ的隊列會因為一個消息有問題而全部堵死。所有在合適的業務場景下,需要合理設置concurrency和prefetch值。
轉自:https://www.jianshu.com/p/04a1d36f52ba