RabbitMQ 部分API解析


/**
 * exchange: 交換器的名稱
 * type:交換器的類型,如Direct Topic Headers Fanout
 *     Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。
 *     Fanout Exchange – 不處理路由鍵。消息都會被轉發到與該交換機綁定的所有隊列上。(Fanout交換機轉發消息是最快的)。
 *     Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。
 *     因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
 * durable:設置是否持久化,true持久化,以保證服務器重啟,不會丟失相關信息
 * autoDelete:是否自動刪除。true為自動刪除,刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁(並不是當與此交換器連接的客戶端都斷開時自動刪除)
 * internal:是否內置,true表示內置交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
 * argument:其它一些結構化參數
 */  
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, false, null);

 

/**
 * 指定隊列
 * queue: 隊列名稱
 * durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,
 *     保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
 * exclusive:排他隊列,
 *     如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
 *     其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。
 *     其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。
 *     其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。
 *     這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
 * autoDelete:自動刪除,true為自動刪除,刪除的前提是至少有一個消費者連接這個隊列,之后所有與這個隊列連接的消費者都斷開時都會自動刪除(並不是當連接此隊列的所有客戶端都斷開時自動刪除)
 * arguments:x-message-ttl(消息過期時間)、
 *             x-max-length(最大積壓消息個數)、
 *             x-dead-letter-exchange(消息過期后投遞的exchange)
 *             x-dead-letter-routing-key(消息過期后按照指定的routingkey重新發送)、
 *             x-max-priority(隊列優先級,值越大優先級超高,優先級高的消息具備優先被消費的特權)
 *            x-expires(控制隊列如果在多長時間未使用則會被刪除,毫秒為單位)、
 *            x-max-length-bytes
 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

 

/**
 * queue:隊列名稱
 * exchange:交換器的名稱
 * routingKey:用來綁定隊列和交換器的路由鍵
 * argument:定義綁定的一些參數
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

 

/**
 * 和queueBind用法相似。
 * 綁定后,消息從source交換器轉發到destination交換器
 */
channel.exchangeBind("destination", "source", "routingKey", null);


 

/**
 * exchange:交換器名稱,如果設置為空字符串,則消息會被發送到RabbitMQ默認的交換器中。
 * routingKey:指定路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中
 * mandatory:為true則當exchange找不到相應的queue時,會調用basic.return方法將消息返還給生產者,否則丟棄
 * immediate:為true則當exchange將消息route到所有queue(s)發現沒有consumer時,不會將消息插入隊列,會調用basic.return方法將消息返還給生產者
 * props:消息為持久化  —— MessageProperties.PERSISTENT_TEXT_PLAIN
 * body:msg字節
 */
channel.basicPublish(ex_log, "", true, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

String CorrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties.Builder builder1 = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties1 = builder1
        .contentType("text/plain")    //標識消息內容的MIME
        .contentEncoding("utf-8")
        .headers(new HashMap())
        .deliveryMode(2)    //消息持久態(2)或瞬態(任何其他值)
        .priority(1)    //消息優先級
        .correlationId(CorrId)    //用來關聯請求(request)和其調用RPC之后的回復(response)  (如rpc客戶端根據id進行消息確認)
        .replyTo(replyQueueName)    //指定消息響應隊列(rpc)
        .expiration("10000")    //消息延遲
        .messageId("33333333333")
        .timestamp(new Date())
        .type("4444444444444")
        .userId("5555555555555")
        .appId("66666666666")
        .clusterId("77777777777777")
        .build();

 

/**
 * 生產者獲取沒有被正確路由到合適隊列的消息,通過添加ReturnListener來實現
 */
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
            throws IOException {
        System.out.println("返回的結果是:"+new String(arg5));
    }
});

/**
 * 確認機制
 */
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
        System.out.println("5");
        //注意這里需要添加處理消息重發的場景
    }
});


 

/**
 * 公平轉發,設置客戶端最多接收未被ack的消息的個數,只有在消費者空閑的時候會發送下一條信息,同一時間每次發給一個消息給一個worker。
 * 一個生產者與多個消費者時,避免RabbitMQ服務器可能一直發送多個消息給一個worker,而另一個可能幾乎不做任何事情。
 */
channel.basicQos(prefetchCount);

 

/**
 * 實現消費者
 */
Consumer consumer = new DefaultConsumer(channel) {
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
        System.out.println("recv msg:"+new String(body));
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 發送應答,DeliveryTag可以看作消息的編號,是一個64位長整型值
         */
        channel.basicAck(envelope.getDeliveryTag(), false);
        /**
         * 單個拒絕,DeliveryTag可以看作消息的編號,requeue為true時,交換器將重新發送消息
         */
        channel.basicReject(envelope.getDeliveryTag(), true);
        /**
         * 批量拒絕,
         */
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    }
};

 

/**
 * 為隊列指定消費者
 * queue: 隊列的名稱
 * autoAck:
 *   true:RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或磁盤)中刪除
 *   false:RabbitMQ會等待消費者地回復確認信號后才從內存(或者磁盤)中移去消息(實際上是先打上刪除標記,之后再刪除)
 *   如果一直沒有收到消費者的確認信號,並且消費此消息的消費者已斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,也有可能還是原來那個消費者
 * consumerTag:消費者標簽,用來區分多個消費者
 * noLocal:設置為true不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者
 * exclusive:是否排它
 * arguments:設置消費者其他參數
 * callback:消費者的回調函數,用來處理RabbitMQ推送過來的消息,比如DefaultConsumer,使用時需要重寫其中的handleDelivery方法
 */  
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback);


/**
 * 發送應答
 */
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

/**
 * 指定隊列
 * queue: 隊列名稱
 * durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,
 *     保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
 * exclusive:排他隊列,
 *     如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
 *     其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。
 *     其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。
 *     其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。
 *     這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
 * autoDelete:自動刪除,true為自動刪除,刪除的前提是至少有一個消費者連接這個隊列,之后所有與這個隊列連接的消費者都斷開時都會自動刪除(並不是當連接此隊列的所有客戶端都斷開時自動刪除)
 * arguments:x-message-ttl(消息過期時間)、
 *             x-max-length(最大積壓消息個數)、
 *             x-dead-letter-exchange(消息過期后投遞的exchange)
 *             x-dead-letter-routing-key(消息過期后按照指定的routingkey重新發送)、
 *             x-max-priority(隊列優先級,值越大優先級超高)
 *            x-expires(控制隊列被自動刪除處於未使用狀態的時間)、
 *            x-max-length-bytes
 */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                    throws IOException {
                System.out.println("返回的結果是:"+new String(arg5));
            }
        });


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM