/**
* exchange: 交換器的名稱
* type:交換器的類型,如Direct Topic Headers Fanout
* Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。
* Fanout Exchange – 不處理路由鍵。消息都會被轉發到與該交換機綁定的所有隊列上。(Fanout交換機轉發消息是最快的)。
* Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。
* 因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
* durable:設置是否持久化,true持久化,以保證服務器重啟,不會丟失相關信息
* autoDelete:是否自動刪除。true為自動刪除,刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁(並不是當與此交換器連接的客戶端都斷開時自動刪除)
* internal:是否內置,true表示內置交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
* argument:其它一些結構化參數
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, false, null);
/**
* 指定隊列
* queue: 隊列名稱
* durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,
* 保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
* exclusive:排他隊列,
* 如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
* 其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。
* 其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。
* 其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。
* 這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
* autoDelete:自動刪除,true為自動刪除,刪除的前提是至少有一個消費者連接這個隊列,之后所有與這個隊列連接的消費者都斷開時都會自動刪除(並不是當連接此隊列的所有客戶端都斷開時自動刪除)
* arguments:x-message-ttl(消息過期時間)、
* x-max-length(最大積壓消息個數)、
* x-dead-letter-exchange(消息過期后投遞的exchange)
* x-dead-letter-routing-key(消息過期后按照指定的routingkey重新發送)、
* x-max-priority(隊列優先級,值越大優先級超高,優先級高的消息具備優先被消費的特權)
* x-expires(控制隊列如果在多長時間未使用則會被刪除,毫秒為單位)、
* x-max-length-bytes
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* queue:隊列名稱
* exchange:交換器的名稱
* routingKey:用來綁定隊列和交換器的路由鍵
* argument:定義綁定的一些參數
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
/**
* 和queueBind用法相似。
* 綁定后,消息從source交換器轉發到destination交換器
*/
channel.exchangeBind("destination", "source", "routingKey", null);
/**
* exchange:交換器名稱,如果設置為空字符串,則消息會被發送到RabbitMQ默認的交換器中。
* routingKey:指定路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中
* mandatory:為true則當exchange找不到相應的queue時,會調用basic.return方法將消息返還給生產者,否則丟棄
* immediate:為true則當exchange將消息route到所有queue(s)發現沒有consumer時,不會將消息插入隊列,會調用basic.return方法將消息返還給生產者
* props:消息為持久化 —— MessageProperties.PERSISTENT_TEXT_PLAIN
* body:msg字節
*/
channel.basicPublish(ex_log, "", true, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
String CorrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties.Builder builder1 = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties1 = builder1
.contentType("text/plain") //標識消息內容的MIME
.contentEncoding("utf-8")
.headers(new HashMap())
.deliveryMode(2) //消息持久態(2)或瞬態(任何其他值)
.priority(1) //消息優先級
.correlationId(CorrId) //用來關聯請求(request)和其調用RPC之后的回復(response) (如rpc客戶端根據id進行消息確認)
.replyTo(replyQueueName) //指定消息響應隊列(rpc)
.expiration("10000") //消息延遲
.messageId("33333333333")
.timestamp(new Date())
.type("4444444444444")
.userId("5555555555555")
.appId("66666666666")
.clusterId("77777777777777")
.build();
/**
* 生產者獲取沒有被正確路由到合適隊列的消息,通過添加ReturnListener來實現
*/
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
throws IOException {
System.out.println("返回的結果是:"+new String(arg5));
}
});
/**
* 確認機制
*/
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
if(multiple) {
confirmSet.headSet(deliveryTag-1).clear();
}else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
if(multiple) {
confirmSet.headSet(deliveryTag-1).clear();
}else {
confirmSet.remove(deliveryTag);
}
System.out.println("5");
//注意這里需要添加處理消息重發的場景
}
});
/**
* 公平轉發,設置客戶端最多接收未被ack的消息的個數,只有在消費者空閑的時候會發送下一條信息,同一時間每次發給一個消息給一個worker。
* 一個生產者與多個消費者時,避免RabbitMQ服務器可能一直發送多個消息給一個worker,而另一個可能幾乎不做任何事情。
*/
channel.basicQos(prefetchCount);
/**
* 實現消費者
*/
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("recv msg:"+new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 發送應答,DeliveryTag可以看作消息的編號,是一個64位長整型值
*/
channel.basicAck(envelope.getDeliveryTag(), false);
/**
* 單個拒絕,DeliveryTag可以看作消息的編號,requeue為true時,交換器將重新發送消息
*/
channel.basicReject(envelope.getDeliveryTag(), true);
/**
* 批量拒絕,
*/
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
};
/**
* 為隊列指定消費者
* queue: 隊列的名稱
* autoAck:
* true:RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或磁盤)中刪除
* false:RabbitMQ會等待消費者地回復確認信號后才從內存(或者磁盤)中移去消息(實際上是先打上刪除標記,之后再刪除)
* 如果一直沒有收到消費者的確認信號,並且消費此消息的消費者已斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,也有可能還是原來那個消費者
* consumerTag:消費者標簽,用來區分多個消費者
* noLocal:設置為true不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者
* exclusive:是否排它
* arguments:設置消費者其他參數
* callback:消費者的回調函數,用來處理RabbitMQ推送過來的消息,比如DefaultConsumer,使用時需要重寫其中的handleDelivery方法
*/
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback);
/**
* 發送應答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
/**
* 指定隊列
* queue: 隊列名稱
* durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,
* 保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
* exclusive:排他隊列,
* 如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
* 其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。
* 其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。
* 其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。
* 這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
* autoDelete:自動刪除,true為自動刪除,刪除的前提是至少有一個消費者連接這個隊列,之后所有與這個隊列連接的消費者都斷開時都會自動刪除(並不是當連接此隊列的所有客戶端都斷開時自動刪除)
* arguments:x-message-ttl(消息過期時間)、
* x-max-length(最大積壓消息個數)、
* x-dead-letter-exchange(消息過期后投遞的exchange)
* x-dead-letter-routing-key(消息過期后按照指定的routingkey重新發送)、
* x-max-priority(隊列優先級,值越大優先級超高)
* x-expires(控制隊列被自動刪除處於未使用狀態的時間)、
* x-max-length-bytes
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
throws IOException {
System.out.println("返回的結果是:"+new String(arg5));
}
});