RabbitMQ服務器會根據路由鍵將消息從交換器路由到隊列中,如何處理投遞到多個隊列的情況?這里不同類型的交換器起到了重要的作用。分別是fanout,direct,topic,每一種類型實現了不同的路由算法。
Fanout Exchange
不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。
生產者
package com.dynamic.rabbitmy.ps; /** * Created by fxq on 2017/3/10. */ import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者 */ public class Send { private final static String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws Exception{ //獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout交換器 //消息內容 String message = "商品已經刪除,id=1000"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println(" [x] Sent'"+message+"'" ); channel.close(); connection.close(); } }
消費者
package com.dynamic.rabbitmy.ps; /** * Created by fxq on 2017/3/10. */ import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * 消費者 */ public class Recv { private final static String QUEUE_NAME="test_queue_fanout_1"; private final static String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws Exception{ //獲取到連接以及通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換器 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //不設置路由鍵 //統一時刻服務器只會發一條消息給消費者; channel.basicQos(1); //定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME,false,consumer); //獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 前台系統:'" + message + "'"); Thread.sleep(10); //手動返回 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
Direct Exchange
處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “test”,則只有被標記為“test”的消息才被轉發,不會轉發test.aaa,也不會轉發dog.123,只會轉發test。
生產者:
package com.dynamic.rabbitmy.routing; /** * Created by fxq on 2017/3/10. */ import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生產者 */ public class Send { private final static String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws Exception{ //獲取到連接以及通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //消息內容 String message = "刪除商品,id = 1001"; channel.basicPublish(EXCHANGE_NAME,"delete",null,message.getBytes()); //此處delete為路由鍵; System.out.println(" [x] Sent '"+ message+"'"); channel.close(); connection.close(); } }
生產者:
package com.dynamic.rabbitmy.routing; /** * Created by fxq on 2017/3/10. */ import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * 消費者1 */ public class Recv { private final static String QUEUE_NAME="test_queue_direct_1"; private final static String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws Exception{ //獲取連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換機; channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update"); //匹配路由鍵為update channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete"); //匹配路由鍵是delete //同一時刻服務器只會發送一條消息給消費者; channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); //監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME,false,consumer); //獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("前台系統:'"+message+"'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
該綁定在交換器上的隊列,它可以匹配delete,update的路由鍵,但不是能匹配insert;必須和生產者聲明是一模一樣;
Topic Exchange
將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
生產者:
package com.dynamic.rabbitmy.topic; /** * Created by fxq on 2017/3/10. */ import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 發送者 */ public class Send { private final static String EXCHANGE_NAME="test_exchange_topic" ; public static void main(String[] args) throws Exception{ //獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //消息內容 String message = "插入商品,id=100"; //發布消息 channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes()); System.out.println(" [x] Sent '"+message + "'"); channel.close(); connection.close(); } }
消費者:
package com.dynamic.rabbitmy.topic; import com.dynamic.rabbitmy.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.sun.media.sound.SF2InstrumentRegion; /** * Created by fxq on 2017/3/10. */ public class Recv2 { private final static String QUEUE_NAME="test_queue_topic2"; private final static String EXCHANGE_NAME="test_exchange_topic" ; public static void main(String[] args) throws Exception{ //獲得連接和mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明通道 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定exchange channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.#"); //使用item.# 匹配所有的以item開頭的 //同一時刻服務器只能發送一條消息給消費者; channel.basicQos(1); //聲明消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監控隊列,設置手動完成 channel.basicConsume(QUEUE_NAME,false,consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("搜索系統 '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
以上就是三種交換器的類型以及他們的使用場景,基於消息的路由鍵和交換器的類型,服務器會決定將消息投遞到那個隊列中。