1、簡介
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。
如上圖所示:
1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程。
2.Virtual Host:其實是一個虛擬概念,類似於權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權限控制的最小粒度是Virtual Host
3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中, ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。
4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。
5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是 真正需要傳輸的APP數據。
6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消 息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到 Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由 Exchange Type決定。
7.Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。
8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能 執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與 Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不· 要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection。
9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發送消息,txSelect開 啟一個事務,txCommit提交一個事務。
2、特點及應用場景
MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ和JMS類似,但不同的是JMS是SUN JAVA消息中間件服務的一個標准和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。
在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。
3、安裝
RabbitMQ是基於Erlang的,所以首先必須配置Erlang環境。
從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。
然后:
$ tar xvzf otp_src_R14B03.tar.gz
$ cd otp_src_R14B03
$ ./configure
編譯后的輸出如下圖:
提示沒有wxWidgets和fop,但是問題不大。繼續:
$ make
$ sudo make install
安裝完Erlang,開始安裝RabbitMQ-Server。
主要參考官方文檔:http://www.rabbitmq.com/build-server.html
需要安裝一個比較新的Python版本。安裝略。
需要安裝simplejson。從此處下載最新的版本: http://pypi.python.org/pypi/simplejson#downloads 。我下載的版本是 simplejson-2.2.1.tar.gz
$ tar xvzf simplejson-2.2.1.tar.gz
$ cd simplejson-2.2.1
$ sudo python setup.py install
然后安裝RabbitMQ Server。從此處下載源代碼版本的RabbitMQ: http://www.rabbitmq.com/server.html。我下載的版本是 rabbitmq-server-2.6.1.tar.gz
$ tar xvzf rabbitmq-server-2.6.1.tar.gz
$ cd rabbitmq-server-2.6.1
$ make
# TARGET_DIR=/usr/local SBIN_DIR=/usr/local/sbin MAN_DIR=/usr/local/man make install
在sbin/目錄下出現了三個命令:
rabbitmqctl rabbitmq-env rabbitmq-server
安裝成功。
運行
找到sbin/目錄,運行程序:
/usr/local/sbin/rabbitmq-server –detached
停止程序:
/usr/local/sbin/rabbitmqctl stop
4、java 實現rabbitMQ---生產者
public interface EventTemplate { void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException; void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException; }
public class DefaultEventTemplate implements EventTemplate { private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class); private AmqpTemplate eventAmqpTemplate; private CodecFactory defaultCodecFactory; // private DefaultEventController eec; // // public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate, // CodecFactory defaultCodecFactory, DefaultEventController eec) { // this.eventAmqpTemplate = eopAmqpTemplate; // this.defaultCodecFactory = defaultCodecFactory; // this.eec = eec; // } public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) { this.eventAmqpTemplate = eopAmqpTemplate; this.defaultCodecFactory = defaultCodecFactory; } @Override public void send(String queueName, String exchangeName, Object eventContent) throws SendRefuseException { this.send(queueName, exchangeName, eventContent, defaultCodecFactory); } @Override public void send(String queueName, String exchangeName, Object eventContent, CodecFactory codecFactory) throws SendRefuseException { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) { throw new SendRefuseException("queueName exchangeName can not be empty."); } // if (!eec.beBinded(exchangeName, queueName)) // eec.declareBinding(exchangeName, queueName); byte[] eventContentBytes = null; if (codecFactory == null) { if (eventContent == null) { logger.warn("Find eventContent is null,are you sure..."); } else { throw new SendRefuseException( "codecFactory must not be null ,unless eventContent is null"); } } else { try { eventContentBytes = codecFactory.serialize(eventContent); } catch (IOException e) { throw new SendRefuseException(e); } } // 構造成Message EventMessage msg = new EventMessage(queueName, exchangeName, eventContentBytes); try { eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg); } catch (AmqpException e) { logger.error("send event fail. Event Message : [" + eventContent + "]", e); throw new SendRefuseException("send event fail", e); } } }
5、java實現rabbitMQ---消費者
public interface EventProcesser { public void process(Object e); }
為了能夠將不同類型的消息交由對應的程序來處理,我們還需要一個消息處理適配器
/** * MessageListenerAdapter的Pojo * <p>消息處理適配器,主要功能:</p> * <p>1、將不同的消息類型綁定到對應的處理器並本地緩存,如將queue01+exchange01的消息統一交由A處理器來出來</p> * <p>2、執行消息的消費分發,調用相應的處理器來消費屬於它的消息</p> * */ public class MessageAdapterHandler { private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class); private ConcurrentMap<String, EventProcessorWrap> epwMap; public MessageAdapterHandler() { this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>(); } public void handleMessage(EventMessage eem) { logger.debug("Receive an EventMessage: [" + eem + "]"); // 先要判斷接收到的message是否是空的,在某些異常情況下,會產生空值 if (eem == null) { logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled."); return; } if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) { logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled."); return; } // 解碼,並交給對應的EventHandle執行 EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if (eepw == null) { logger.warn("Receive an EopEventMessage, but no processor can do it."); return; } try { eepw.process(eem.getEventData()); } catch (IOException e) { logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e); return; } } protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) { throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. "); } EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor); EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw); if (oldProcessorWrap != null) { logger.warn("The processor of this queue and exchange exists, and the new one can't be add"); } } protected Set<String> getAllBinding() { Set<String> keySet = epwMap.keySet(); return keySet; } protected static class EventProcessorWrap { private CodecFactory codecFactory; private EventProcesser eep; protected EventProcessorWrap(CodecFactory codecFactory, EventProcesser eep) { this.codecFactory = codecFactory; this.eep = eep; } public void process(byte[] eventData) throws IOException{ Object obj = codecFactory.deSerialize(eventData); eep.process(obj); } } }
6、Python實現rabbitMQ---消費者(個人在項目中使用的)
#!/usr/bin/env python #coding=utf8 import pika import ConfigParser import string, os, sys '''讀取配置項''' cf = ConfigParser.ConfigParser() cf.read("agent.conf") host = cf.get("agent", "host") exchangeName = cf.get("agent", "exchangeName") queueName = cf.get("agent", "queueName") command = cf.get("agent", "command") connection = pika.BlockingConnection(pika.ConnectionParameters(host)) channel = connection.channel() #定義交換機 channel.exchange_declare(exchange=exchangeName, type='fanout') #生成隊列,並綁定到交換機上 result = channel.queue_declare(queue=queueName,durable=True,exclusive=False) queue_name = result.method.queue channel.queue_bind(exchange=exchangeName, queue=queue_name) #回調函數 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) os.system(command+' '+body) print " [x] Received %r" % (body,) channel.basic_consume(callback, queue=queue_name) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
下面我們具體分析一下代碼:
rabbitmq的python實例工作隊列:
1、連接到rabbitmq服務器。
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
2、聲明消息隊列,消息將在這個隊列中進行傳遞。如果將消息發送到不存在的隊列,rabbitmq將會自動清除這些消息。
result = channel.queue_declare(queue=queueName,durable=True,exclusive=False)
queue_name = result.method.queue