Java 訪問RabbitMQ


一、概述

 

前面學過ActiveMQ。ActiveMQ主要是實現的JMS規范,而RabbitMQ就是AMQP的一個具體實現。

RabbitMQ里面有幾個概念:生產者、消費者、消息、交換器、路由鍵、隊列、綁定、虛擬主機

1.生產者角度

  生產者產生數據,然后根據指定交換器和路由鍵將數據發送到消息隊列RabbitMQ。為了保證交換器的存在,我們每次在初始化生產者的時候都要嘗試去創建一個交換器。

  交換器總共有4種類型:

  1. direct 路由鍵完全匹配
  2. fanout 消息廣播,將忽略路由鍵
  3. topic 通過“*”和“#”的通配符進行綁定。注意:”.”將路由鍵分為了幾個標識符,“*”匹配1個,“#”匹配一個或多個
  4. headers 和direct類似,很少使用

2.消費者角度

  消費者主要就是獲取並消費數據,因此需要創建一個隊列,同時需要創建一個交換器(交換器在消費者和生產者都可以創建),然后將隊列和交換器通過路由鍵進行綁定。最后就可以根據隊列進行數據的消費了。

二、Java代碼

 

1.pom.xml

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

 

2.生產者代碼

package cn.duanjt;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 生產者
 * @author 段江濤
 * @date 2018-11-30
 */
public class Productor {
    public static void main(String[] args) throws IOException, TimeoutException {
        String ROUTE_KEY = "rabbitmq-duanjt";// 路由鍵名稱
        String EXCHANGE_NAME = "exchange-duanjt";// 交換器名稱

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.23.24");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");//虛擬主機,可通過控制台查看

        //創建連接和信道
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        // 創建一個交換器,參數為:交互器名稱和交換器類型
        // 注意:其實這個交換器只需要聲明一次就可以,但是由於無法保證交換器已經存在了,所以我們每次都要聲明
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        for (int i = 0; i < 5; i++) {
            String msg = "Hello world.I love you forever ===>" + i;
            // 發布消息,需要參數:交換器,路由鍵。最后一個參數為消息內容
            // 注意:RabbitMQ的消息類型只有一種,那就是byte[]
            channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, null, msg.getBytes("utf-8"));

            System.out.println("send:" + msg);
        }

        //關閉信道和連接
        channel.close();
        conn.close();
    }
}

 

3.消費者代碼

package cn.duanjt;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        String QUEUE_NAME = "queue-duanjt";// 隊列名稱
        String ROUTE_KEY = "rabbitmq-duanjt";// 路由鍵名稱
        String EXCHANGE_NAME = "exchange-duanjt";// 交換器名稱

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.23.24");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        // 創建一個隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 創建交換器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 將隊列和交換器通過路由鍵進行綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_KEY);

        //開始消費,第二個參數表示自動確認
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            // 當消息到達時執行回調方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[Receive]:" + message);
            }
        });

    }
}

  

注意:

1.為了保證交換器的存在,所以消費者和生產者都要創建,因為不知道是消費者先啟動還是生產者先啟動

2.可以通過http://ip:15672 查看交換器、路由鍵和隊列之間的關系

3.一個連接(Connection)可以創建多個信道(Channel)。每個信道也可以在獨立的一個線程里面

4.一個隊列可以有多個消費者,這種情況下,消息將在消費者之間進行輪詢

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM