消息中間件實現不同系統之間通信的一個系統,就rabbitMQ來講,消息的發出方將消息送入某個交換機,並且制定一個路由關鍵字,該交換機根據路由關鍵字將消息放入對應的隊列中,然后一直監聽着隊列的程序便可以接收道相應的消息,並且根據預定的程序執行相應的邏輯。
下面通過一個例子來實現程序間的通信:
消息發出方:

package cn.ly; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 測試rabbitmq客戶端,制造者端測試 * * 生產者發送信息的過程: * 1.創建連接 * 2.設置虛擬機 * 3.創建會話通道 * 4.聲明隊列 * 5.聲明交換機 * 6.綁定交換機和隊列,創建路由關鍵字 * 7.發送消息 * 8.關閉資源 * 注明:4-6這三步不是必須的,但是如果直接發送消息而沒有隊列的話程序會出錯,所以,在發送之前先聲明, * 同理,消費者端也是這樣,需要先聲明,沒有的話就會創建,有的話就不發生什么; * * 對於工作模式: * 1.work queues:不使用交換機,只有一個隊列,可以有多個消費端,隊列通知采用輪詢的方法給監聽的多個消費端 * 發送消息; * 2.publish/subscribe:通過交換機進行消息轉發,有多個隊列,每個隊列均可有一個或者多個消費者進行監聽, * 每次生產者發送消息,則均由交換機轉發至各個隊列,由各個隊列自行通知監聽的消費者; * 3.Routing:模式同2,區別,為每個隊列配備一個,或多個路由關鍵字,發送消息時指定路由關鍵字,由交換機根據路由 * 關鍵字匹配進行轉發; * 4.Topics:模式同3,區別在於,配備的路由關鍵字可以為通配符的形式,通配符有:#和*,區別:#匹配任意個單詞,而* * 只能匹配單個單詞,其中路由關鍵字指定規則:多個單詞使用.隔開; * 5.Header:模式同3,區別,匹配的是鍵值對; * 6.RPC:遠程異步調用,mq的一個應用,有客戶端和服務端,客戶端向mq發送一個調用服務端的信息,服務端獲得信息,調用 * 相應服務,將返回結果作為消息發送到另一個隊列,客戶端監聽該隊列獲取返回信息; * * 對於消費者,每個消費者監聽隊列時指定的參數中,有隊列名,是否自動回復,以及一個的map屬性和一個回調方法; * 消費者,單獨的消費者無法實現同時監聽兩個隊列的操作; * 針對隊列:可以給它增加路由key,也可以給一系列的鍵值對,作為頭信息,這些東西都是對隊列的標識,當生產者發送消息時, * 使用這些標識來決定將消息發送到那個隊列,有路由key的話,優先匹配路由key; * 消費者的行為很單純,就是監聽一個隊列,然后發現消息就回收; */ public class ProducerTest { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; // rabbitMq @Test public void run() { // 通過連接工廠創建新的連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬機 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 創建連接 connection = connectionFactory.newConnection(); // 創建會話通道 channel = connection.createChannel(); //聲明隊列,如果隊列在mq 中沒有則要創建 //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 參數明細 * 1、queue 隊列名稱 * 2、durable 是否持久化,如果持久化,mq重啟后隊列還在 * 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建 * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除) * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //聲明一個交換機 //參數:String exchange, String type /** * 參數明細: * 1、交換機的名稱 * 2、交換機的類型 * fanout:對應的rabbitmq的工作模式是 publish/subscribe * direct:對應的Routing 工作模式 * topic:對應的Topics工作模式 * headers: 對應的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); // 綁定交換機和隊列 //參數:String queue, String exchange, String routingKey /** * 參數明細: * 1、queue 隊列名稱 * 2、exchange 交換機名稱 * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS); for(int i=0;i<5;i++){ //發送消息的時候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //發送消息的時候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //發送消息的時候指定routingKey String message = "send sms and email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
做的事情主要有:創建鏈接,設置虛擬機,創建會話通道,聲明隊列,聲明交換機,通過關鍵字綁定交換機,發送消息,關閉鏈接;一個鏈接可以有多個會話通道;其中,聲明交換機,聲明隊列和綁定這三件事不是必須做的,只是,初次運行發送消息時如果沒有對應的交換機則會報錯,另外,如果在創建交換機時報錯,可以訪問localhost:15672中,查看是否有同名的交換機,有的話刪除即可;
消息接收方:

package cn.ly.cn.ly; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { //隊列名稱 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; public static void main(String[] args) throws IOException, TimeoutException { // 通過連接工廠創建新的連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬機 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; // 創建連接 connection = connectionFactory.newConnection(); // 創建會話通道 channel = connection.createChannel(); //聲明隊列,如果隊列在mq 中沒有則要創建 //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 參數明細 * 1、queue 隊列名稱 * 2、durable 是否持久化,如果持久化,mq重啟后隊列還在 * 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建 * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除) * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //聲明一個交換機 //參數:String exchange, String type /** * 參數明細: * 1、交換機的名稱 * 2、交換機的類型 * fanout:對應的rabbitmq的工作模式是 publish/subscribe * direct:對應的Routing 工作模式 * topic:對應的Topics工作模式 * headers: 對應的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); // 綁定交換機和隊列 //參數:String queue, String exchange, String routingKey /** * 參數明細: * 1、queue 隊列名稱 * 2、exchange 交換機名稱 * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); //實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 當接收到消息后此方法將被調用 * @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume * @param envelope 信封,通過envelope * @param properties 消息屬性 * @param body 消息內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //監聽隊列 //參數:String queue, boolean autoAck, Consumer callback /** * 參數明細: * 1、queue 隊列名稱 * 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復 * 3、callback,消費方法,當消費者接收到消息要執行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
消息接收方主要是創建鏈接,聲明回調方法,監聽隊列,同樣,創建隊列的操作不是必須的,但是,如果隊列不存在,監聽時會報錯,所以提前創建;
需要注意的一點:同一個隊列可以由多個進程同時監聽,但是,同一條消息只能被一個進程接收,即監聽同一隊列的多個程序在一次消息發送中,只會有一個接收消息並處理;