RabbitMQ快速入門


RabbitMQ是一個開源的,實現AMQP協議的,可復用企業消息隊列系統。

類似的系統還有ActiveMQ(實現JMS)和Kafka(分布式)。RabbitMQ支持主流的操作系統,支持多種開發語言,能降低系統間訪問的耦合度,便於數據同步。

 

RabbitMQ提供如下5種隊列模型(遠程調用不是消息隊列)。

1.Simple

2.Work. 工作模式,一個消息只能被一個消費者獲取。

         

3.Publish/Subscribe. 訂閱模式,消息被路由投遞給多個隊列,一個消息被多個消費者獲取。ExchangeType為fanout。

4.Routing. 路由模式,一個消息被多個消費者獲取。並且消息的目的queue可被生產者指定。ExchangeType為direct。

5.Topic. 通配符模式,一個消息被多個消費者獲取。消息的目的queue可用BindingKey以通配符(#:一個或多個詞,*:一個詞)的方式指定。ExchangeType為topic。

 

6.PRC. 遠程調用

 

 

相關名詞:
1. Server:RabbitMQ服務器,
2. VirtualHost:權限控制的基本單位,一個VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user使用。
3. Connection:生產者/消費者和RabbitMQ服務器的TCP連接。
4. Channel:創建完Connection后,需創建信道才能執行AMQP命令。一個Connection可以創建多個Channel。
5. Exchange:路由。接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType有fanout、direct和topic三種,對應路由使用上述3/4/5號模型。
6. (Message)Queue:消息隊列,用於存儲還未被消費者消費的消息。
7. Message:由Header和Body組成。Header是生產者添加的相關屬性:是否持久化、被哪個MessageQueue接收、優先級等。而Body是傳輸的數據。
8. Binding:消息被復制傳遞時,一個消費者對應一個消息隊列,消費者綁定MessageQueue到Exchange,可指定多個Bindingkey。生產者在發送Message時,可以在header指定RoutingKey,Exchange匹配RoutingKey和Bindingkey將Message路由到相應的Queue。
9. Command:AMQP命令,生產者/消費者通過Command完成與RabbitMQ服務器交互。Publish:發送消息,txSelect:開啟事務,txCommit:提交事務。

 

程序員應該用代碼來表達自己的思想,下面用代碼展示以上五種模式:

首先創建工廠類,獲取Connection

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //connection工廠
        ConnectionFactory factory = new ConnectionFactory();
       
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("zx");
        factory.setPassword("zx");
        factory.setVirtualHost("/zx");
        
        // 通過工廠獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

 

1.Simple

生產者

public class Send {

    private final static String QUEUE_NAME = "queue_simple";

    public static void main(String[] argv) throws Exception {
        // 獲取連接
        Connection connection = ConnectionUtil.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();

        // 聲明隊列 (不存在則創建)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 發送消息
        String message = "Hello World";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        // 關閉通道和連接
        channel.close();
        connection.close();
    }
}

消費者

public class Recv {

    private final static String QUEUE_NAME = "queue_simple";

    public static void main(String[] argv) throws Exception {

        // 獲取連接
        Connection connection = ConnectionUtil.getConnection();
        // 創建通道
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);  //true 自動確認消息, 下有詳解

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //阻塞或輪詢
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
        }
    }
}

 

2.Work

生產者

public class Send {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

 

消費者1

public class Recv1 {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 開啟Qos, 同一時刻服務器只發送一條消息. 可以嘗試注釋該行, 會發現消息會被平均分配給兩個消費者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            // 模擬handling
            Thread.sleep(100);
            // 手動確認消息接收. 在basicConsume方法中, true為自動, false為手動
            /* 消息確認方式: 
             * 1. 自動確認. 只要消息從隊列中移除, 服務端認為消息被成功消費
             * 2. 手動確認. 消費者獲取消息后, 服務器將該消息標記為不可用, 並等待反饋. 如果消費者一直不反饋, 則該消息將一直處於不可用狀態
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2

public class Recv2 {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 模擬handling
            Thread.sleep(200);
            // ACK
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

3.Publish/Subscribe

生產者

public class Send {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息內容
        String message = "Hello world";
        // 與前面不同, 生產者將消息發送給exchange, 而非隊列. 若發消息時還沒消費者綁定queue與該exchange, 消息將丟失
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        channel.close();
        connection.close();
    }
}

消費者1

public class Recv1 {

    private final static String QUEUE_NAME = "queue_fanout_1";
    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機. 綁定也可在rabbitMQ的管理界面進行
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            Thread.sleep(100);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2

public class Recv2 {

    private final static String QUEUE_NAME = "queue_fanout_2";
    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            Thread.sleep(200);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

4.Routing

生產者

public class Send {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String message = "Hello world";
        // 發送消息, RoutingKey為 insert
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());

        channel.close();
        connection.close();
    }
}

消費者1

public class Recv1 {

    private final static String QUEUE_NAME = "queue_direct_1";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊列到交換機, BindingKey為 delete update
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            Thread.sleep(100);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2

public class Recv2 {

    private final static String QUEUE_NAME = "queue_direct_2";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機, BindingKey為 insert delete update
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            Thread.sleep(200);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

5.Topic

生產者

public class Send {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String message = "Hello world";
        // 發送消息, 指定RoutingKey
        channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());

        channel.close();
        connection.close();
    }
}

消費者1

public class Recv1 {

    private final static String QUEUE_NAME = "queue_topic_1";
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);
        
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:" + message);
            Thread.sleep(100);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2

public class Recv2 {

    private final static String QUEUE_NAME = "queue_topic_2";

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機. 通配符!
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取:'" + message);
            Thread.sleep(200);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

可以看出,上面調用的關系比較復雜。

幸運的是,Spring提供了對rabbitMQ的封裝,將復雜的關系設置整合到配置文件中。

依賴於兩個組件,抽象層spring-amqp和實現層spring-rabbit。

於是代碼簡化為:

生產者

public class Send {
    
    public static void main(String[] args) throws InterruptedException {
        AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml");
        
        //拿模板的bean
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        //發消息
        String msg = "Hello world";
        template.convertAndSend(msg);   //該函數還能指定routing-key
        
        Thread.sleep(1000);
        ctx.close();
    }
}

消費者

public class Recv {
    public void listen(String msg) {
        System.out.println("獲取" + msg);
    }
}

 

非常漂亮的封裝。配置文件如下

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- connection工廠 -->
    <rabbit:connection-factory id="connectionFactory"
        host="127.0.0.1" port="5672" username="zx" password="zx"
        virtual-host="/zx" />

    <!-- MQ的管理,包括隊列、交換器等 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 聲明隊列 (auto表示需要時創建)-->
    <rabbit:queue name="myQueue" auto-declare="true"/>
    
    <!-- 聲明fanout類型的exchange (auto表示需要時創建) -->
    <rabbit:fanout-exchange name ="fanoutExchange" auto-declare="true" durable="true" >  <!-- durable是否持久化, 安全性還是性能的權衡 -->
        <!-- 注意, 在生產者/消費者 分離的系統中, exchange和queue也分離, 綁定應該交給運維在rabbit管理界面進行, 而不是配置下面的bindings屬性 -->
        <!-- 小細節, rabbit管理界面綁定時界面屬性中binding key被寫成了routing key? -->
        <rabbit:bindings>
            <rabbit:binding queue="myQueue"/>  <!-- 還能指定通過pattern屬性指定bindingType -->
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 定義Rabbit模板的bean,指定 exchange或queue -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />  <!-- 還能指定routing-key屬性 -->

    <bean id="recv" class="com.zx.rabbitmq.spring.Recv" />
    <!-- 設置消費者要監聽的隊列, 並指定有消息時執行的方法 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="recv" method="listen" queue-names="myQueue" />
    </rabbit:listener-container>

</beans>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM