RabbitMQ Java實例


引入RabbitMQ的jar包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.1.2</version>
</dependency>

創建消息生產者

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 import java.io.IOException;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 public class Producer {
 9     private static final String QUEUE_NAME ="queue.test";
10 
11     public static void main(String[] args) throws IOException, TimeoutException {
12         //創建連接工廠
13         ConnectionFactory connectionFactory = new ConnectionFactory();
14         connectionFactory.setHost("localhost");
15         //創建一個連接
16         Connection connection = connectionFactory.newConnection();
17         //創建一個通道
18         Channel channel = connection.createChannel();
19         //聲明隊列
20         //queueDeclare第一個參數表示隊列名稱、
21         // 第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、
22         // 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、
23         // 第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
24         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
25         String msg = "hello rabbit";
26         //發送消息到隊列
27         //basicPublish第一個參數為交換機名稱、
28         // 第二個參數為隊列映射的路由key、
29         // 第三個參數為消息的其他屬性、
30         // 第四個參數為發送信息的主體
31         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));
32         System.out.println("Producer Send +'" + msg + "'");
33         channel.close();
34         connection.close();
35     }
36 }

創建消費者

package com.ysl.rabbit;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {

    private static final String QUEUE_NAME ="queue.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //創建一個連接
        Connection connection = connectionFactory.newConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //聲明隊列
        //queueDeclare第一個參數表示隊列名稱、
        // 第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、
        // 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、
        // 第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Customer Waiting Received messages");
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * envelope主要存放生產者相關信息(比如交換機、路由key等)body是消息實體。
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
            }
        };
        //自動回復隊列應答 -- RabbitMQ中的消息確認機制
        channel.basicConsume(QUEUE_NAME,true, consumer);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM