RabbitMQ的Java API編程


  1.創建Maven工程,pom.xml引入依賴:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

  2.生產者:

public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 連接IP
        factory.setHost("192.168.254.137");
        // 連接端口
        factory.setPort(5672);
        // 虛擬機
        factory.setVirtualHost("/");
        // 用戶
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立連接
        Connection conn = factory.newConnection();
        // 創建消息通道
        Channel channel = conn.createChannel();

        // 發送消息
        String msg = "Hello world, Rabbit MQ";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "wuzz.test", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

  3.消費者:

public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 連接IP
        factory.setHost("192.168.254.137");
        // 默認監聽端口
        factory.setPort(5672);
        // 虛擬機
        factory.setVirtualHost("/");

        // 設置訪問的用戶
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立連接
        Connection conn = factory.newConnection();
        // 創建消息通道
        Channel channel = conn.createChannel();

        // 聲明交換機
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

        // 聲明隊列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" Waiting for message....");

        // 綁定隊列和交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"wuzz.test");

        // 創建消費者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 開始獲取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

參數說明:

聲明交換機的參數:

  String type:交換機的類型,direct, topic, fanout中的一種。

  boolean durable:是否持久化,代表交換機在服務器重啟后是否還存在。

  boolean autoDelete:是否自動刪除。

聲明隊列的參數:

  boolean durable:是否持久化,代表隊列在服務器重啟后是否還存在。

  boolean exclusive:是否排他性隊列。排他性隊列只能在聲明它的Connection中使用,連接斷開時自動刪除。

  boolean autoDelete:是否自動刪除。如果為true,至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,隊列會自動刪除。

  Map<String, Object> arguments:隊列的其他屬性,例如x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority。

消息屬性BasicProperties:

  消息的全部屬性有14個,以下列舉了一些主要的參數:

  Map<String,Object> headers 消息的其他自定義參數

  Integer deliveryMode 2持久化,其他:瞬態

  Integer priority 消息的優先級

  String correlationId 關聯ID,方便RPC相應與請求關聯

  String replyTo 回調隊列

  String expiration TTL,消息過期時間,單位毫秒

  登陸 RabbitMQ 后台管理界面,先運行消費端,建立連接,創建通道,聲明交換機,隊列,創建消費者監聽隊列,打開后台管理頁面可以看到如下信息:

  交換機的信息:

  后台管理界面可以看到當前的連接,Channel,Exchange,Queue等信息。

  由消費者來創建對象(交換機、隊列、綁定關系)。

RabbitMQ常見配置:

1、TTL(Time To Live)消息的過期時間:

  有兩種設置方式:通過隊列屬性設置消息過期時間,設置單條消息的過期時間。

// 通過隊列屬性設置消息過期時間
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000);

// 聲明隊列(默認交換機AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);

// 對每條消息設置過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  .deliveryMode(2) // 持久化消息
  .contentEncoding("UTF-8")
  .expiration("10000") // TTL
  .build();

// 此處兩種方式設置消息過期時間的方式都使用了,將以較小的數值為准

// 發送消息
channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());

2.死信隊列:

  有三種情況消息會進入DLX(Dead Letter Exchange)死信交換機。
    1、(NACK || Reject ) && requeue == false  設置手動應答ACK,且沒有應答並且拒絕了消息,禁止消息入隊。
    2、消息過期
    3、隊列達到最大長度(先入隊的消息會被發送到DLX)

// 創建消費者,並接收消息
Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
    String msg = new String(body, "UTF-8");
    System.out.println("Received message : '" + msg + "'");

    if (msg.contains("拒收")){
      // 拒絕消息
      // requeue:是否重新入隊列,true:是;false:直接丟棄,相當於告訴隊列可以直接刪除掉
      // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費
      channel.basicReject(envelope.getDeliveryTag(), false);
    } else if (msg.contains("異常")){
      // 批量拒絕
      // requeue:是否重新入隊列
      // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費
      channel.basicNack(envelope.getDeliveryTag(), true, false);
    } else {
      // 手工應答
      // 如果不應答,隊列中的消息會一直存在,重新連接的時候會重復消費
      channel.basicAck(envelope.getDeliveryTag(), true);
    }
  }
};
// 開始獲取消息,注意這里開啟了手工應答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

  可以設置一個死信隊列(Dead Letter Queue)與DLX()死信交換機綁定,即可以存儲Dead Letter,消費者可以監聽這個隊列取走消息。

Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
// 指定了這個隊列的死信交換機
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);
// 聲明死信交換機
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
// 聲明死信隊列
channel.queueDeclare("DLX_QUEUE", false, false, false, null);
// 綁定
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");

3.優先級隊列,可以讓消息優先得到消費:

  可以通過創建隊列的時候設置一個隊列的優先級的最大值,然后設置指定消息的優先級的值。優先級高的消息可以優先被消費,但是:只有消息堆積(消息的發送速度大於消費者的消費速度)的情況下優先級才有意義。

Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-max-priority",10); // 隊列最大優先級
channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);

Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("name", "gupao");
        headers.put("level", "top");

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)   // 2代表持久化
    .contentEncoding("UTF-8")  // 編碼
    .expiration("10000")  // TTL,過期時間
    .headers(headers) // 自定義屬性
    .priority(5) // 優先級,默認為5,配合隊列的 x-max-priority 屬性使用
    .messageId(String.valueOf(UUID.randomUUID()))
    .build();
channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());

4.延遲隊列:

  RabbitMQ本身不支持延遲隊列。可以使用TTL結合DLX的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,到了指定時間,消息過期后,就會從DLX路由到這個隊列,消費者可以從這個隊列取走消息。

  另一種方式是使用rabbitmq-delayed-message-exchange插件。當然,將需要發送的信息保存在數據庫,使用任務調度系統掃描然后發送也是可以實現的。

5.服務端流控(Flow Control):

  RabbitMQ 會在啟動時檢測機器的物理內存數值。默認當 MQ 占用 40% 以上內存時,MQ 會主動拋出一個內存警告並阻塞所有連接(Connections)。可以通過修改 rabbitmq.config 文件來調整內存閾值,默認值是 0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].

  默認情況,如果剩余磁盤空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。

  注意:調整隊列長度只在消息堆積的情況下有意義,而且會刪除先入隊的消息,不能實現服務端限流。

6.消費端限流:

  在AutoACK為false的情況下,如果一定數目的消息(通過基於consumer或者channel設置Qos的值)未被確認前,不進行消費新的消息。類似ActiveMQ的 FetchSize預取大小,這里也有這么一個概念。

channel.basicQos(2); // 如果超過2條消息沒有發送ACK,當前消費者不再接受隊列消息
channel.basicConsume(QUEUE_NAME, false, consumer);

Spring 集成 RabbitMQ:

ConnectionFactory:

  Spring AMQP 的連接工廠接口,用於創建連接。CachingConnectionFactory 是ConnectionFactory 的一個實現類。

RabbitAdmin:

  RabbitAdmin 是 AmqpAdmin 的實現,封裝了對 RabbitMQ 的基礎管理操作,比如對交換機、隊列、綁定的聲明和刪除等。為什么我們在配置文件(Spring)或者配置類(SpringBoot)里面定義了交換機、隊列、綁定關系,並沒有直接調用 Channel 的 declare 的方法,Spring 在啟動的時候就可以幫我們創建這些元數據?這些事情就是由 RabbitAdmin 完成的。RabbitAdmin 實 現 了 InitializingBean 接 口 , 里 面 有 唯 一 的 一 個 方 法afterPropertiesSet(),這個方法會在 RabbitAdmin 的屬性值設置完的時候被調用。在 afterPropertiesSet ()方法中,調用了一個 initialize()方法。這里面創建了三個Collection,用來盛放交換機、隊列、綁定關系。最后依次聲明返回類型為 Exchange、Queue 和 Binding 這些 Bean,底層還是調用了 Channel 的 declare 的方法。

Message:

  Message 是 Spring AMQP 對消息的封裝。兩個重要的屬性:

  • body:消息內容。
  • messageProperties:消息屬性。

RabbitTemplate 消息模板:

  RabbitTemplate 是 AmqpTemplate 的一個實現(目前為止也是唯一的實現),用來簡化消息的收發,支持消息的確認(Confirm)與返回(Return)。跟 JDBCTemplate一 樣 , 它 封 裝 了 創 建 連 接 、 創 建 消 息 信 道 、 收 發 消 息 、 消 息 格 式 轉 換(ConvertAndSend→Message)、關閉信道、關閉連接等等操作。針對於多個服務器連接,可以定義多個 Template。可以注入到任何需要收發消息的地方使用。

Messager Listener 消息 偵聽:

  MessageListener 是 Spring AMQP 異步消息投遞的監聽器接口,它只有一個方法onMessage,用於處理消息隊列推送來的消息,作用類似於 Java API 中的 Consumer。

MessageListenerContainer:

  MessageListenerContainer可以理解為MessageListener的容器,一個Container只有一個 Listener,但是可以生成多個線程使用相同的 MessageListener 同時消費消息。Container 可以管理 Listener 的生命周期,可以用於對於消費者進行配置。例如:動態添加移除隊列、對消費者進行設置,例如 ConsumerTag、Arguments、並發、消費者數量、消息確認模式等等。

轉換器 MessageConvertor:

  MessageConvertor 的 作用?RabbitMQ 的消息在網絡傳輸中需要轉換成 byte[](字節數組)進行發送,消費者需要對字節數組進行解析。在 Spring AMQP 中,消息會被封裝為 org.springframework.amqp.core.Message對象。消息的序列化和反序列化,就是處理 Message 的消息體 body 對象。如果消息已經是 byte[]格式,就不需要轉換。如果是 String,會轉換成 byte[]。如果是 Java 對象,會使用 JDK 序列化將對象轉換為 byte[](體積大,效率差)。在 調 用 RabbitTemplate 的 convertAndSend() 方 法 發 送 消 息 時 , 會 使 用MessageConvertor 進行消息的序列化,默認使用 SimpleMessageConverter。在某些情況下,我們需要選擇其他的高效的序列化工具。如果我們不想在每次發送消息時自己處理消息,就可以直接定義一個 MessageConvertor。如何 自定義 MessageConverter ?例如:我們要使用 Gson 格式化消息:創建一個類,實現 MessageConverter 接口,重寫 toMessage()和 fromMessage()方法。

1.在導入Spring 的相關依賴之外,導入 RabbitMQ的依賴:

<!--rabbitmq依賴 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

2. applicationContext.xml :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <import resource="classpath*:rabbitMQ.xml" />

    <!-- 掃描指定package下所有帶有如 @Controller,@Service,@Resource 並把所注釋的注冊為Spring Beans -->
    <context:component-scan base-package="com.wuzz.*" />

    <!-- 激活annotation功能 -->
    <context:annotation-config />

    <!-- 激活annotation功能 -->
    <context:spring-configured />
</beans>

3.配置rabbitMQ.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

    <!--配置connection-factory,指定連接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672" />

    <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

    <!--######分隔線######-->
    <!--定義queue -->
    <rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!--定義direct exchange,綁定MY_FIRST_QUEUE -->
    <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定義rabbit template用於數據的接收和發送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />

    <!--消息接收者 -->
    <bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>

    <!--queue listener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
    </rabbit:listener-container>

    <!--定義queue -->
    <rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 將已經定義的Exchange綁定到MY_SECOND_QUEUE,注意關鍵詞是key -->
    <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="receiverSecond" class="com.gupaoedu.consumer.SecondConsumer"></bean>

    <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
    </rabbit:listener-container>

    <!--######分隔線######-->
    <!--定義queue -->
    <rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 定義topic exchange,綁定MY_THIRD_QUEUE,注意關鍵詞是pattern -->
    <rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定義rabbit template用於數據的接收和發送 -->
    <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="MY_TOPIC_EXCHANGE" />

    <!-- 消息接收者 -->
    <bean id="receiverThird" class="com.gupaoedu.consumer.ThirdConsumer"></bean>

    <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
    </rabbit:listener-container>

    <!--######分隔線######-->
    <!--定義queue -->
    <rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 定義fanout exchange,綁定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
    <rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" >
        <rabbit:bindings>
            <rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
            <rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 消息接收者 -->
    <bean id="receiverFourth" class="com.gupaoedu.consumer.FourthConsumer"></bean>

    <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
    </rabbit:listener-container>
</beans>

 4.生產者:

@Service
public class MessageProducer {
    private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    @Qualifier("amqpTemplate")
    private AmqpTemplate amqpTemplate;

    @Autowired
    @Qualifier("amqpTemplate2")
    private AmqpTemplate amqpTemplate2;

    /**
     * 演示三種交換機的使用
     *
     * @param message
     */
    public void sendMessage(Object message) {
        logger.info("Send message:" + message);

        // amqpTemplate 默認交換機 MY_DIRECT_EXCHANGE
        // amqpTemplate2 默認交換機 MY_TOPIC_EXCHANGE

        // Exchange 為 direct 模式,直接指定routingKey
        amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
        amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);

        // Exchange模式為topic,通過topic匹配關心該主題的隊列
        amqpTemplate2.convertAndSend("msg.Third.send","[Topic,msg.Third.send] "+message);

        // 廣播消息,與Exchange綁定的所有隊列都會收到消息,routingKey為空
        amqpTemplate2.convertAndSend("MY_FANOUT_EXCHANGE",null,"[Fanout] "+message);
    }
}

5.消費者,需要實現 MessageListener 接口:

public class FirstConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(FirstConsumer.class);

    public void onMessage(Message message) {
        logger.info("The first consumer received message : " + message.getBody());
    }
}

SpringBoot 集成 RabbitMQ:

  基於下面這個結構圖去集成:

   消費者的配置:

1.構建Springboot項目導入依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.Rabbit配置類,生成交換機,隊列,以及匹配規則:

@Configuration
public class RabbitConfig {

    //1.定義三個交換機
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("DIRECT_EXCHANGE");
    }

    @Bean
    public TopicExchange topicExchange(){

        return new TopicExchange("TOPIC_EXCHANGE");
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("FANOUT_EXCHANGE");
    }
    //2.定義四個隊列
    @Bean
    public Queue firstQueue(){
        return new Queue("FIRST_QUEUE");
    }

    @Bean
    public Queue secondQueue(){
        return new Queue("SECOND_DQUEUE");
    }

    @Bean
    public Queue thirdQueue(){
        return new Queue("THIRD_DQUEUE");
    }

    @Bean
    public Queue fourthQueue(){
        return new Queue("FOURTH_DQUEUE");
    }
    //3.定義四個綁定關系
    @Bean
    public Binding bindFirst(@Qualifier("firstQueue") Queue queue,
                             @Qualifier("directExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("wuzz.test");
    }

    @Bean
    public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
                              @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("*.wuzz.*");
    }

    @Bean
    public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
                             @Qualifier("fanoutExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding bindFourth(@Qualifier("fourthQueue") Queue queue,
                              @Qualifier("fanoutExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

3.定義4個消費者:

@Configuration
@RabbitListener(queues = "FIRST_QUEUE")
public class FirstConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println("First Queue received msg : " + msg);
    }
}
@Configuration
@RabbitListener(queues = "SECOND_DQUEUE")
public class SecondConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println("second Queue received msg : " + msg);
    }
}
@Configuration
@RabbitListener(queues = "THIRD_DQUEUE")
public class ThirdConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println("third Queue received msg : " + msg);
    }
}
@Configuration
@RabbitListener(queues = "FOURTH_DQUEUE")
public class FourthConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println("fourth Queue received msg : " + msg);
    }
}

4.配置application.properties,定義鏈接地址等信息:

spring.application.name=spirng-boot-rabbitmq-consumer
spring.rabbitmq.host=192.168.254.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

  啟動消費者,可以通過管理頁面看到有4個鏈接通道及交換機隊列信息。

  生產者:

1.構建SpringBoot項目導入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.通過 RabbitTemplate 構建自己的生產者類,發送4條消息:

@Component
public class MyProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public  void send(){
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","wuzz.test","DIRECT_EXCHANGE message");
        rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","hangzhou.wuzz.test","TOPIC_EXCHANGE hangzhou message");
        rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","wenzhou.wuzz.test","TOPIC_EXCHANGE wenzhou message");
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE","","FANOUT_EXCHANGE message");

    }
}

3.編寫測試類測試,並且啟動:

@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {

    @Autowired
    private MyProducer myProducer;

    @Test
    public  void  send(){
        myProducer.send();
    }
}

  同樣需要配置application.properties,順利的話消費者方會收到消息如下,這樣就集成完了:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM