先介紹rabbmitmq的幾個方法:
// 聲明一個隊列 -// queue 隊列名稱 // durable 為true時server重啟隊列不會消失 (是否持久化) // exclusive 隊列是否是獨占的,如果為true只能被一個connection使用,其他連接建立時會拋出異常 // autoDelete 當沒有任何消費者使用時,自動刪除該隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/* * 向server發布一條消息 * 參數1:exchange名字,若為空則使用默認的exchange * 參數2:routing key * 參數3:其他的屬性 * 參數4:消息體 * RabbitMQ默認有一個exchange,叫default exchange,它用一個空字符串表示,它是direct exchange類型, * 任何發往這個exchange的消息都會被路由到routing key的名字對應的隊列上,如果沒有對應的隊列,則消息會被丟棄 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
// 同一時刻服務器只會發一條消息給消費者(能者多勞模式) channel.basicQos(1);
/*消息消費完成確認 * autoAck 是否自動確認 true自動確認 false手動確認 * 模式1:自動確認 只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消息,都認為是消息已經成功消費。 * 模式2:手動確認 * 消費者從隊列中獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那么 該消息將一直處於不可用狀態。 * 如果選用自動確認,在消費者拿走消息執行過程中出現宕機時,消息可能就會丟失!! */ channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
當手動確認時,一定要在消息處理完成后,確認提交,加上如下代碼:
// 消息處理完成,手動確認提交 // deliveryTag 該消息的index // multiple:是否批量 true:將一次性ack所有小於deliveryTag的消息。 channel.basicAck(envelope.getDeliveryTag(), false);
/* * 使用fanout類型創建的交換器 * exchange:交換機名稱 * type:交換機類型(direct/topic/fanout) */ channel.exchangeDeclare(EXCHANGE_NAME, ConfigKey.EX_FANOUT);
/* * 獲取到一個臨時隊列名稱。 * channel.queueDeclare():創建一個非持久化、獨立、自動刪除的隊列名稱 * 此隊列是臨時的,隨機的,一旦我們斷開消費者,隊列會立即被刪除 * 隨機隊列名,如amq.gen-jzty20brgko-hjmujj0wlg */ String queueName = channel.queueDeclare().getQueue();
/* * 將隊列跟交換器進行綁定 * queue:隊列名稱 * exchange:交換機名稱 * routingKey:隊列跟交換機綁定的鍵值 */ channel.queueBind(queueName, EXCHANGE_NAME, "black");