rabbitMQ--快速入門


1、簡介

  MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。

  

  如上圖所示:  

  1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程。

  2.Virtual Host:其實是一個虛擬概念,類似於權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權限控制的最小粒度是Virtual Host

  3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,             ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。

  4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。

  5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是           真正需要傳輸的APP數據。

  6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消         息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到               Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由             Exchange Type決定。 

  7.Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。

  8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能           執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與             Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不·         要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection。

  9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發送消息,txSelect開         啟一個事務,txCommit提交一個事務。

2、特點及應用場景

  MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ和JMS類似,但不同的是JMS是SUN JAVA消息中間件服務的一個標准和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。

  在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。

3、安裝

  RabbitMQ是基於Erlang的,所以首先必須配置Erlang環境。  

  從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。

 

  然后:

    $ tar xvzf otp_src_R14B03.tar.gz
    $ cd otp_src_R14B03
    $ ./configure

 

   編譯后的輸出如下圖:

          

  提示沒有wxWidgets和fop,但是問題不大。繼續:
    $ make
    $ sudo make install

  安裝完Erlang,開始安裝RabbitMQ-Server。  

  主要參考官方文檔:http://www.rabbitmq.com/build-server.html

  需要安裝一個比較新的Python版本。安裝略。

  需要安裝simplejson。從此處下載最新的版本: http://pypi.python.org/pypi/simplejson#downloads 。我下載的版本是 simplejson-2.2.1.tar.gz

    $ tar xvzf simplejson-2.2.1.tar.gz
    $ cd simplejson-2.2.1
    $ sudo python setup.py install

  然后安裝RabbitMQ Server。從此處下載源代碼版本的RabbitMQ: http://www.rabbitmq.com/server.html。我下載的版本是 rabbitmq-server-2.6.1.tar.gz

    $ tar xvzf rabbitmq-server-2.6.1.tar.gz
    $ cd rabbitmq-server-2.6.1
    $ make
    # TARGET_DIR=/usr/local SBIN_DIR=/usr/local/sbin MAN_DIR=/usr/local/man make install 

  在sbin/目錄下出現了三個命令:

    rabbitmqctl  rabbitmq-env  rabbitmq-server

  安裝成功。

  運行

    找到sbin/目錄,運行程序:
      /usr/local/sbin/rabbitmq-server –detached 

    停止程序:
      /usr/local/sbin/rabbitmqctl stop 

 4、java 實現rabbitMQ---生產者

    

public interface EventTemplate {

    void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
        
    void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}
public class DefaultEventTemplate implements EventTemplate {

    private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);

    private AmqpTemplate eventAmqpTemplate;

    private CodecFactory defaultCodecFactory;

//    private DefaultEventController eec;
//
//    public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
//            CodecFactory defaultCodecFactory, DefaultEventController eec) {
//        this.eventAmqpTemplate = eopAmqpTemplate;
//        this.defaultCodecFactory = defaultCodecFactory;
//        this.eec = eec;
//    }
    
    public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
        this.eventAmqpTemplate = eopAmqpTemplate;
        this.defaultCodecFactory = defaultCodecFactory;
    }

    @Override
    public void send(String queueName, String exchangeName, Object eventContent)
            throws SendRefuseException {
        this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
    }  

    @Override
    public void send(String queueName, String exchangeName, Object eventContent,
            CodecFactory codecFactory) throws SendRefuseException {
        if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
            throw new SendRefuseException("queueName exchangeName can not be empty.");
        }
        
//        if (!eec.beBinded(exchangeName, queueName))
//            eec.declareBinding(exchangeName, queueName);

        byte[] eventContentBytes = null;
        if (codecFactory == null) {
            if (eventContent == null) {
                logger.warn("Find eventContent is null,are you sure...");
            } else {
                throw new SendRefuseException(
                        "codecFactory must not be null ,unless eventContent is null");
            }
        } else {
            try {
                eventContentBytes = codecFactory.serialize(eventContent);
            } catch (IOException e) {
                throw new SendRefuseException(e);
            }
        }

        // 構造成Message
        EventMessage msg = new EventMessage(queueName, exchangeName,
                eventContentBytes);
        try {
            eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
        } catch (AmqpException e) {
            logger.error("send event fail. Event Message : [" + eventContent + "]", e);
            throw new SendRefuseException("send event fail", e);
        }
    }
}

5、java實現rabbitMQ---消費者

  

public interface EventProcesser {
    public void process(Object e);
}

  為了能夠將不同類型的消息交由對應的程序來處理,我們還需要一個消息處理適配器

  

/**
 * MessageListenerAdapter的Pojo
 * <p>消息處理適配器,主要功能:</p>
 * <p>1、將不同的消息類型綁定到對應的處理器並本地緩存,如將queue01+exchange01的消息統一交由A處理器來出來</p>
 * <p>2、執行消息的消費分發,調用相應的處理器來消費屬於它的消息</p>
 * 
 */
public class MessageAdapterHandler {

    private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);

    private ConcurrentMap<String, EventProcessorWrap> epwMap;

    public MessageAdapterHandler() {
        this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
    }

    public void handleMessage(EventMessage eem) {
        logger.debug("Receive an EventMessage: [" + eem + "]");
        // 先要判斷接收到的message是否是空的,在某些異常情況下,會產生空值
        if (eem == null) {
            logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
            return;
        }
        if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
            logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
            return;
        }
        // 解碼,並交給對應的EventHandle執行
        EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
        if (eepw == null) {
            logger.warn("Receive an EopEventMessage, but no processor can do it.");
            return;
        }
        try {
            eepw.process(eem.getEventData());
        } catch (IOException e) {
            logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
            return;
        }
    }

    protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
        if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
            throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
        }
        EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
        EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
        if (oldProcessorWrap != null) {
            logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
        }
    }

    protected Set<String> getAllBinding() {
        Set<String> keySet = epwMap.keySet();
        return keySet;
    }

    protected static class EventProcessorWrap {

        private CodecFactory codecFactory;

        private EventProcesser eep;

        protected EventProcessorWrap(CodecFactory codecFactory,
                EventProcesser eep) {
            this.codecFactory = codecFactory;
            this.eep = eep;
        }

        public void process(byte[] eventData) throws IOException{
            Object obj = codecFactory.deSerialize(eventData);
            eep.process(obj);
        }
    }
}

6、Python實現rabbitMQ---消費者(個人在項目中使用的)

   

#!/usr/bin/env python
#coding=utf8
import pika
import ConfigParser
import string, os, sys 

'''讀取配置項'''
cf = ConfigParser.ConfigParser()
cf.read("agent.conf")
host = cf.get("agent", "host")
exchangeName = cf.get("agent", "exchangeName")
queueName = cf.get("agent", "queueName")
command = cf.get("agent", "command")

connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
#定義交換機
channel.exchange_declare(exchange=exchangeName, type='fanout')
 
#生成隊列,並綁定到交換機上
result = channel.queue_declare(queue=queueName,durable=True,exclusive=False)
queue_name = result.method.queue
channel.queue_bind(exchange=exchangeName, queue=queue_name) 
#回調函數
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    os.system(command+' '+body)
    print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

 

下面我們具體分析一下代碼:

 rabbitmq的python實例工作隊列:

  

  1、連接到rabbitmq服務器。

  connection = pika.BlockingConnection(pika.ConnectionParameters(host))
 2、聲明消息隊列,消息將在這個隊列中進行傳遞。如果將消息發送到不存在的隊列,rabbitmq將會自動清除這些消息。  
  result = channel.queue_declare(queue=queueName,durable=True,exclusive=False)
  queue_name = result.method.queue
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM