exchange_declare('direct_logs', 'direct', false, false, false);// 這個是申明交換器,如果沒有申明就給默認隊列的這個交換器,而且發送的類型默認是direct)
順序 |
參數名 |
默認值 |
作用 |
1 |
$exchange |
無 |
交換機名 |
2 |
$type |
無 |
交換機類型,分別有direct、fanout、topic |
3 |
$passsive |
false |
只判斷不創建(一般用於判斷該交換機是否存在),如果你希望查詢交換機是否存在.而又不想在查詢時創建這個交換機.設置此為 |
4 |
$durable |
false |
表示了如果MQ服務器重啟,這個交換機是否要重新建立(如果設置為true則重啟mq,該交換機還是存在,相當於持久化。) |
5 |
$auto_delete |
true |
無用自動銷毀。如果綁定的所有隊列都不在使用了.是否自動刪除這個交換機.(比如設置為true,它綁定的對列全部被刪除后,該交換器會被自動刪除,) |
6 |
$internal |
false |
內部交換機.即不允許使用客戶端推送消息.MQ內部可以讓交換機作為一個隊列綁定到另外一個交換機下.想想一下以太網的交換機就是了.所以開啟這個屬性,表示是一個他直接收其他交換機發來的信息 |
7 |
$nowait |
false |
如果為True則表示不等待服務器回執信息.函數將返回NULL,可以提高訪問速度..應用范圍不確定 |
8 |
$arguments |
null |
額外的一些參數,比如優先級,什么的.需要單獨開篇講 |
9 |
$ticket |
null |
未知 |
queue_declare('queueName', false, true, false, true, false)
順序 |
參數名 |
默認值 |
作用 |
1 |
$queue |
無 |
隊列名.而存在默認值的意思是.你可以創建一個不重復名稱的一個臨時隊列.(交換機沒法創建臨時的) |
2 |
$passsive |
false |
只判斷不創建(判斷該隊列是否存在) 只查詢不創建.如果為true,如果存在這個隊列,則會返回隊列的信息.如果不存在這個隊列..則會拋異常(與交換機不同的是,如果交換機判斷存在,則返回NULL,否則異常) |
3 |
$durable |
false |
重啟重建(持久化) |
4 |
$exclusive |
false |
排他隊列,如果你希望創建一個隊列,並且只有你當前這個程序(或進程)進行消費處理.不希望別的客戶端讀取到這個隊列.用這個方法甚好.而同時如果當進程斷開連接.這個隊列也會被銷毀.不管是否設置了持久化或者自動刪除. |
5 |
$auto_delete |
true |
自動銷毀(當最后一個消費者取消訂閱時隊列會自動移除,對於臨時隊列只有一個消費服務時適用,) |
6 |
$nowait |
false |
執行后不需要等結果 |
7 |
$arguments |
null |
$arguments = new AMQPTable([ 'x-message-ttl' => 10000, // 延遲時間 (毫秒)創建queue時設置該參數可指定消息在該queue中待多久,可根據x-dead-letter-routing-key和x-dead-letter-exchange生成可延遲的死信隊列。 'x-expires' => 26000, // 隊列存活時間 如果一個隊列開始沒有設置存活時間,后面又設置是無效的。 'x-dead-letter-exchange' => 'exchange_direct_ttl3', // 延遲結束后指向交換機(死信收容交換機) 'x-dead-letter-queue' => 'queue_ttl3', // 延遲結束后指向隊列(死信收容隊列), //'x-dead-letter-routing-key' => 'queue_ttl3', // 設置routing-key //'x-max-priority'=>'10' //聲明優先級隊列.表示隊列應該支持的最大優先級。建議使用1到10之間.該參數會造成額外的CPU消耗。 ] ); |
8 |
$ticket |
null |
|
queue_bind('queue_delete1', 'exchange_delete1');
參數序號 |
參數名 |
作用 |
1 |
$queue |
隊列名 |
2 |
$exchange |
交換機名 |
3 |
$routing_key |
路由名(對應) |
4 |
$nowait |
不等待執行結果 |
5 |
$arguments |
額外參數 |
6 |
$ticket |
…. |
$message = new AMQPMessage("消息內容",['配置項'=>'配置值']);
配置項 |
類型 |
說明 |
content_type |
短文本 |
MIME類型表示消息是一種什么類型的格式,參考MIME類型 |
content_encoding |
短文本 |
正文傳輸編碼,比如內容是gzip壓縮的.值就是gzip,參考 |
application_headers |
數組 |
請求的headers信息 |
delivery_mode |
數字 |
表示是否持久化,1為否,2為是 參考 |
priority |
數字 |
發送權重,也就是優先級 |
correlation_id |
短文本 |
相關性ID 參考 |
reply_to |
短文本 |
消息被發送者處理完后,返回回復時執行的回調(在rpc時會用到) |
expiration |
短文本 |
存活時間,毫秒數 |
message_id |
短文本 |
擴展屬性 |
timestamp |
數字 |
時間戳 |
type |
短文本 |
擴展屬性 |
user_id |
短文本 |
擴展屬性 |
app_id |
短文本 |
擴展屬性 |
cluster_id |
短文本 |
擴展屬性 |
Basic_publish($msg, $exchange, $routing_key)
順序 |
參數名 |
作用 |
1 |
$msg |
消息對象 |
2 |
$exchange |
消息對象(交換機名稱) 如果沒有指定交換器,會指定一個默認的交換器,第三個參數是路由鍵,當申明一個隊列時,它會自動綁定到默認交換器,並以隊列名稱作為路由鍵。那么這段代碼會自動發送到hello的隊列 (hello隊列必須事先申明好)。所以這種消息,當有多個進程時會均勻的分給不同的進程處理 |
3 |
$routing_key |
消息的路由名 |
4 |
$mandatory |
消息至少有一個隊列能夠接受,如果交換機無法把消息發送到具體的隊列中,是否要把消息發送到失敗投遞記錄中,而不是讓其消失(當mandatory標志位設置為true時,如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,那么broker會調用basic.return方法將消息返還給生產者;當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者;) |
5 |
$immediate |
這個是一個被作廢的屬性. |
6 |
$ticket |
|
function 接收消息回調($message)
{
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
代碼的意思為,根據消息的delivery_info['channel']
找到通道,並調用通道的basic_ack
方法發送消息的確認內容.
##泄露問題
如果我們只是接受了消息,並進行處理.但是處理完后.沒有發起ack就會導致服務器上的消息一直堆積.服務器會發送新的消息.同時會記錄當前的這個鏈接有哪些消息一直還沒回復.(服務器認為你會回復,一直等待)。如果消費者進程停止掉重啟..就會重新接收所有消息!
$channel->basic_qos(null, 1, null);
可告知RabbitMQ只有在consumer處理並確認了上一個message后才分配新的message給他,否則上一個沒處理完會一直卡在這里,這個根據業務場景配置,
basic_qos注意事項:由於消費者自身處理能力有限,從rabbitmq獲取一定數量的消息后,希望rabbitmq不再將隊列中的消息推送過來,當對消息處理完后(即對消息進行了ack,並且有能力處理更多的消息)再接收來自隊列的消息
這時候我們就需要要到basic_qos,
basic_qos($prefetch_size,
$prefetch_count, //最重要的參數,未確認的消息同時存在的個數;(也就是未ack的消息數,我們可以以此來作為記錄失敗數據的個數;)
$a_global
)
prefetch_count在no_ask=false的情況下生效,即在自動應答的情況下這兩個值是不生效的
注意:如果我們有兩個進程,一個設置prefetch_count為1,一個沒有設置這個,這樣只會有一個進程會等待確認,還有一個不會等待確認,這樣容易導致不可預知的錯誤;當多個進程設置prefetch_count值時,相互之間的數據時沒有影響的;比如兩個進程都設置的是2那么總的未確認存在數是4;
basic_consume("TestQueue", "", false, false, false, false, $callback)
順序 |
參數名 |
默認值 |
作用 |
1 |
queue |
|
消息要取得消息的隊列名 |
2 |
consumer_tag |
|
消費者標簽 |
3 |
no_local |
false |
這個功能屬於AMQP的標准,但是rabbitMQ並沒有做實現. |
4 |
no_ack |
false |
收到消息后,是否不需要回復確認即被認為被消費(在默認情況下,消息確認機制是關閉的。現在是時候開啟消息確認機制,該參數設置為true,並且工作進程處理完消息后發送確認消息。) |
5 |
exclusive |
false |
排他消費者,即這個隊列只能由一個消費者消費.適用於任務不允許進行並發處理的情況下.比如系統對接 |
6 |
nowait |
false |
不返回執行結果,但是如果排他開啟的話,則必須需要等待結果的,如果兩個一起開就會報錯 |
7 |
callback |
null |
回調函數 |
8 |
ticket |
null |
|
9 |
arguments |
null |
|