RabbitMQ學習第一記:用java連接RabbitMQ


1、什么是RabbitMQ

  MQ(Message Queue):消息隊列,是服務端設計的一個可以存儲大量消息的隊列,並提供客戶端操作隊列的方法:生產隊列(向隊列中添加數據)、消費隊列(從隊列中取數據)。RabbitMQ就是基於消息隊列的一個典型應用。RabbitMQ除了普通的生產消費功能,還有一些高級功能:公平分發 ,輪詢分發,路由模式,通配符模式,發布訂閱,隊列持久化。

2、java實現RabbitMQ的連接

2.1、RabbitMQ客戶端jar包

 

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>

 

2.2、java連接RabbitMQ工具類

public class ConnectionUtil
{
    private static Logger logger = Logger.getLogger(ConnectionUtil.class);
    
    public static Connection getConnection()
    {
        try
        {
            Connection connection = null;
            //定義一個連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            //設置服務端地址(域名地址/ip)
            factory.setHost("127.0.0.1");
            //設置服務器端口號
            factory.setPort(5672);
            //設置虛擬主機(相當於數據庫中的庫)
            factory.setVirtualHost("/");
            //設置用戶名
            factory.setUsername("admin");
            //設置密碼
            factory.setPassword("888888");
            connection = factory.newConnection();
            return connection;
        }
        catch (Exception e)
        {
            return null;
        }
    }
}

2.3、簡單的生產者-消費者模式

  下圖取自於官方網站(RabbitMQ)的生產消費模式的工作圖

P:消息的生產者

C:消息的消費者

紅色:隊列 

生產者將消息發送到隊列,消費者從隊列中獲取消息。

2.4、生產者(Send)

public class Send
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_simple_queue";
    
    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "This is simple queue";
            //發送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
            System.out.println("[send]:" + message);
            channel.close();
            connection.close();
        }
        catch (IOException | TimeoutException e)
        {
            e.printStackTrace();
        }
    }
}

運行結果:
[send]:This is simple queue

2.5、消費者(Receive)

public class Receive
{
    //隊列名稱
    private static final String QUEUE_NAME = "test_simple_queue";
    
    public static void main(String[] args)
    {
        try
        {
            //獲取連接
            Connection connection = ConnectionUtil.getConnection();
            //從連接中獲取一個通道
            Channel channel = connection.createChannel();
            //聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定義消費者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //當消息到達時執行回調方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[Receive]:" + message);
                }
            };
            //監聽隊列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
        catch (IOException | ShutdownSignalException | ConsumerCancelledException e)
        {
            e.printStackTrace();
        }
    }
}

運行結果:
[Receive]:This is simple queue

總結:簡單的生產者-消費者模式實現了生產者向隊列里生產數據,消費者啟動后可以一直監聽隊列,不斷的從隊列里取出數據。

注意:本文僅代表個人理解和看法喲!和本人所在公司和團體無任何關系!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM