在Java中使用RabbitMQ


 

依賴

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

 

 

 

 生產者

public class Producer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void send() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建信道
        Channel channel = connection.createChannel();

        //通過信道聲明一個exchange,若已存在則直接使用,不存在會自動創建
        //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通過信道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建
        //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //將queue綁定至某個exchange。一個exchange可以綁定多個queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //發送消息
        String msg = "hello";  //消息內容
        String routing_key = "my_routing_key.key1";  //發送消息使用的routing-key
        channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以傳遞所有類型(轉換為byte[]),不局限於字符串
        System.out.println("send message:"+msg);

        //關閉連接
        channel.close();
        connection.close();
    }

}

 

 

 

 

exchange詳解

rabbitmq自帶了7個交換機,都是以amq開頭。我們可以使用自帶的,也可以自己新建交換機。

 

 

交換機的參數

先看最下面的add a new exchange:

  • name   交換機的name
  • type  交換機的類型,說白了就是routing-key匹配queue使用哪種匹配規則
  • durability  消息是否支持持久化,durable是支持持久化,重啟rabbitmq,rabbitmq上的消息還在、不會丟失,上面features里的D就是durable;transient是不支持持久化,重啟rabbitmq,rabbitmq上的消息丟失。因為消息是保存在內初中的,不持久化到硬盤,關閉rabbitmq消息直接就沒了,持久化后再次啟動時會從硬盤加載消息。transient關鍵字用於阻止序列化。
  • auto delete   如果此交換機一個queue都沒有綁定,是否自動刪除此交換機
  • internal  此交換機是否只在rabbitmq內部使用,大部分交換機都要暴露出來,給消息生產者用,只有少數(一般是rabbitmq自帶的)是內部使用的。features里的I就是internal,表示只在內部使用,自帶的amq.rabbitmq.trace用來跟蹤rabbitmq內部的消息投遞過程,只在內部使用。
  • arguments  給此交換機設置一些其它參數

 

//通過信道聲明一個exchange,若已存在則直接使用,不存在會自動創建
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

聲明一個交換機,參數和上面控制台add a new exchange的參數順序是一致的,arguments是用map表示,一般不必設置其它參數,寫成null即可。

 

自帶的7個交換機,第一個是(AMQP default),不是說這個交換機的名字是AMQP default。

這個交換機的名字沒有名字(空),如果要使用這個交換機,代碼里交換機的name要寫成空串,括號是說明這個交換機是rabbitmq默認的交換機。

 

 

 

 

exchange的4種類型

我們綁定exchange、queue時使用了一個routing-key:

private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

這個exchange使用routing-key來匹配綁定的queue,即要將消息發送到哪些隊列。

 

 

 在發送消息時又使用了一個routing-key:

 String routing_key = "my_routing_key.key1"; //發送消息使用的routing-key

 channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes());

rabbitmq會將這個消息發送到指定的exchange,



如何匹配?是完全匹配還是模糊匹配?這就涉及到exchange的4種type:

 

(1)direct  完全匹配

發送消息使用的routing-key,要與交換機使用的routing-key完全相同。

比如exchange使用的routing-key是"my_routing_key",那發送消息使用的routing_key也要是"my_routing_key"

 

 

(2)topic  模糊匹配,可以使用通配符

*只能匹配一個詞,#可以匹配一個或多個詞。

注意是詞,不是字符。"my_routing_key.key1",my_routing_key是一個詞,key1是一個詞,詞之間用點號分隔。

比如exchange的routing-key是"my_routing_key.#" ,則發送消息使用的routing-key以"my_routing_key."開頭即可

我在示例中用的就是這種。這種方式非常適合把一個消息投遞到多個queue(應用)

 

 

(3)fanout  廣播模式

不使用routing-key,一般把routing-key都設置為空串,當然設置為什么字符串都行,反正都不用。

exchange會把消息投遞(廣播)到此exchange綁定的所有的queue。

這種模式效率很高,因為不進行routing-key的匹配,大大減小了時間開銷。

 

 

(4)headers  首部模式(了解即可)

不使用routing-key(路由鍵),根據header將消息投遞到匹配的queue。

Map<String, Object> exchange_headers = new Hashtable<String, Object>();
headers.put("x-match", "any"); //指定鍵值對匹配模式any、all
headers.put("key1", "value1");  //放入一些鍵值對
headers.put("key2", "value2");

//綁定queue時指定指定map。至於routing-key,設置為什么串都行,反正不使用
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers);


//發送消息時也要使用map
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("key1", "value1");  //放入一些鍵值對


Builder properties = new BasicProperties.Builder();
properties.headers(headers);


String msg="hello";
//指定消息要使用的header。要使用properties的形式,不能直接發送map。會放在http請求頭中
channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());

x-match指定匹配模式,all:發送消息的header(map)中的鍵值對要和exchange的header中的所有的鍵值對都要相同,exchange的header有2個鍵值對key1、key2,發送消息的header中也要有這2個鍵值對(需要完全相同)。any:發送消息的header中只要有一個鍵值對和exchange中的鍵值對完全相同就行,比如key1、key2都行,只要有一個就行了。

header這種方式不常用,因為有點復雜。都要匹配queue,用routing-key它不簡單,它不香嗎?非要搞得這么復雜。

 

不過properties的使用還是需要了解一下:

一個消息由properties、body組成,也就是basicPublish()的后2個參數。

properties可以設置此消息的一些參數,比如延時投遞、優先級,這些參數寫成鍵值對放在map中,將map轉換為properties,再將properties作為basicPublish()的參數。

 

 

 

 

Queue詳解

 

type:指定queue類型,默認為classic(主隊列),還可以設置為quorum(從隊列),將主隊列同步到從隊列,主隊列故障時還可以用從隊列。

name:此queue的名稱

durability:queue中的消息是否持久化到硬盤。exchange也有此屬性,消息會先發給exchange保存,exchange再投遞到某些queue,消費者還沒處理此消息時,消息會保存在queue中。

auto delete:如果此queue沒有綁定到任何一個exchange,是否自動刪除此queue

arguments:設置一些其他參數

 

//聲明一個queue
//參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

排它:這個queue只能在當前連接中使用(拒絕在其他連接中使用),由當前連接聲明|創建,斷開連接會自動刪除此queue。

 

一個exchange可以綁定多gueue,一個queue也可以綁定到多個exchange上。





 

消費者

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱

    public static void receive() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建信道
        Channel channel = connection.createChannel();

        //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法默認是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);

    }
}

這段代碼表面上沒有問題,監聽queue就完事了。但有一個隱患:

我們是在生產者中聲明|創建的exchange、queue,如果生產者尚未運行,並且rabbitmq上沒有對應的exchange、queue(之前沒有創建),啟動消費者,消費者要監聽指定的queue,根本就連接不上queue,指定的queue創都沒創建,監聽什么?直接報錯。

 

 

更加健壯的寫法是:在消費者中也聲明exchange、queue,有就直接用,沒有會自動創建。

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void receive() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建信道
        Channel channel = connection.createChannel();

        //通過信道聲明一個exchange,若已存在則直接使用,不存在會自動創建
        //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通過信道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建
        //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //將queue綁定至某個exchange。一個exchange可以綁定多個queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法默認是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

 

 

 

 

rabbitmq控制台的queue:

ready是此queue中待投遞的消息數,unacked是已投遞、但消費者尚未確認的消息數(和快遞已簽收、未收貨差不多),total是消息總數,即前面2個之和。

incoming是一個消息從exchange進入queue花的時間

deliver/get是一個消息從queue投遞到消費者花的時間,

ack是一條消息投遞給消費者后,過了多長時間queue才收到消費者的應答。

/s表示單位是秒

 

 

ack  確認、應答。

消費者收到queue投遞的消息,然后處理消息,處理后發送一個數據包給queue作為確認、應答(相當於拿到包裹,試了下沒問題,收貨),

queue將消息投遞給消費者后,queue中仍然保留此消息,要消費者應答后才會刪除此消息。

 

 

//參數:要監聽的queue、是否自動確認消息、使用的Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);

第二個參數:autoAck,是否自動應答。

true:自動應答,queue把消息投遞給消費者,就認為消費者簽收了,投遞了一個消息就直接刪除該消息。

這並不可靠,如果消費者還沒處理完就出故障了,那這條消息就丟失了、沒被處理到。

fasle:不使用自動應答,需要消費者自己應答。

 

 

消費者手動應答:

     //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法默認是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);  //處理完了,應答|簽收
                //channel.basicReject(envelope.getDeliveryTag(), true); //拒收
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, false, consumer);

就算消費者處理消息時宕機,只要不應答,queue中的這條消息就一直存在,消費者再次啟動時還會投遞此消息。

basicAck()的第一個參數是DeliveryTag,在一個queue中唯一標識一條消息,相當於一條消息的id。

第二個參數是multiple,多個、批處理,是否將多個消息的應答放在一起、一次性發給queue,設置為true可減少網絡流量、防止網絡阻塞,但是之前消息的應答有時延。

 

 

如果處理消息時發生了異常(代碼執行出了問題),在catch中拒收就是了:

catch (...){ 
  channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入隊
  //..... //記錄日志
}

第二個參數是requeue,是否重新入隊,設置為fasle,不再重新入隊,queue會刪除此消息;設置為true,重新入隊,queue會將此消息重新投遞給消費者。

沒必要把消息的整個處理流程都放在try中,只把可能出現異常的代碼塊放在try中即可,在catch中拒收。

 

這就是rabbitmq提供的可靠投遞機制。再加上消息的持久化,做到了rabbitmq的高可靠性。

 

但重新入隊有一個問題:如果大量的消息重新入隊,重新投遞這些消息會占用資源,使其它消息的投遞變慢。

 

 

 

 

開發過程中可能遇到的問題

1、exchange的name唯一標識一個exchange,調試時可能修改了exchange的類型,如果之前存在一個同名的exchange,會報錯。

 如果之前的同名的exchange不要了,到rabbitmq控制台刪除同名的exchange即可;如果之前的同名的exchange還要用,就把現在的exchange改下name。

 

2、消費者要一直監聽queue,所以消費者的channel、connection都沒關閉,再次啟動時可能連接不上,會報錯,因為rabbitmq上還保持着這個連接。

 等幾分鍾再運行,等連接超時被刪除即可。

 

 

 

說明

1、可以在rabbitmq控制台設置queue的綁定、發送消息到queue

delivery  投遞、交付

persistent  持續的、持久的、堅持不懈的。表示此消息會持久化到硬盤。

payload即消息的body。

點擊publish message會將此消息投遞到當前queue。

 

 

2、消息的有效期

有些消息對即時性要求很高,過了一些時間,如果這條消息還積壓在queue中,這條消息可能就沒有使用價值了,沒必要再投遞,需要刪除這條消息。

可以設置消息的有效期,如果指定的時間內沒有投遞此消息,queue會自動刪除此消息,不再投遞。

具體代碼參考:https://blog.csdn.net/liu0808/article/details/81356552

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM