RabbitMQ學習筆記二:Java實現RabbitMQ


本地安裝好RabbitMQ Server后,就可以在Java語言中使用RabbitMQ了。

RabbitMQ是一個消息代理,從“生產者”接收消息並傳遞消息至“消費者”,期間可根據規則路由、緩存、持久化消息。“生產者”也即message發送者以下簡稱P,相對應的“消費者”乃message接收者以下簡稱C,message通過queue由P到C,queue存在於RabbitMQ,可存儲盡可能多的message,多個P可向同一queue發送message,多個C可從同一queue接收message。

幾個關鍵概念:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

現在,可以上代碼了。首先,是在項目中加入需要的jar包,我使用的是maven項目,直接配置maven及可:

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.0.4</version>
</dependency>

后面還會用到的jar包,配置如下

<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>2.6</version>
</dependency>

先寫一個類,將產生產者和消費者統一為 EndPoint類型的隊列。不管是生產者還是消費者, 連接隊列的代碼都是一樣的,這樣可以通用一些。

package cn.com.shopec.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public abstract class EndPoint {

    protected Channel channel;

    protected Connection connection;

    protected String endPointName;

    public EndPoint(String endpointName) throws IOException
    {
        this.endPointName = endpointName;

        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();

        // 與RabbitMQ Server建立連接  
        // 連接到的broker在本機localhost上
        factory.setHost("localhost");

        // getting a connection
        connection = factory.newConnection();

        // creating a channel
        channel = connection.createChannel();

        // declaring a queue for this channel. If queue does not exist,
        // it will be created on the server.
        // queueDeclare的參數:queue 隊列名;durable true為持久化;exclusive 是否排外,true為隊列只可以在本次的連接中被訪問,
        // autoDelete true為connection斷開隊列自動刪除;arguments 用於拓展參數
        channel.queueDeclare(endpointName, false, false, false, null);
    }

    /**
     * 關閉channel和connection。並非必須,因為隱含是自動調用的。
     * @throws IOException
     */
    public void close() throws IOException
    {
        this.channel.close();
        this.connection.close();
    }
}

生產者類的任務是向隊列里寫一條消息

package cn.com.shopec.rabbitmq;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;

public class Producer extends EndPoint {

    public Producer(String endPointName) throws IOException
    {
        super(endPointName);
    }

    public void sendMessage(Serializable object) throws IOException
    {
        channel.basicPublish("", endPointName, null, SerializationUtils.serialize(object));
    }
}

消費者可以以線程方式運行,對於不同的事件有不同的回調函數,其中最主要的是處理新消息到來的事件。

package cn.com.shopec.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

public class QueueConsumer extends EndPoint implements Runnable, Consumer {

    public QueueConsumer(String endPointName) throws IOException
    {
        super(endPointName);
    }

    public void run()
    {
        try
        {
            // start consuming messages. Auto acknowledge messages.
            channel.basicConsume(endPointName, true, this);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

    /**
     * Called when consumer is registered.
     */
    public void handleConsumeOk(String consumerTag)
    {
        System.out.println("Consumer " + consumerTag + " registered");
    }

    /**
     * Called when new message is available.
     */
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException
    {
        Map map = (HashMap) SerializationUtils.deserialize(body);
        System.out.println("Message Number " + map.get("message number") + " received.");

    }

    public void handleCancel(String consumerTag)
    {
    }

    public void handleCancelOk(String consumerTag)
    {
    }

    public void handleRecoverOk(String consumerTag)
    {
    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1)
    {
    }
}

測試類中,先運行一個消費者線程,然后開始產生大量的消息,這些消息會被消費者取走。

package cn.com.shopec.rabbitmq;

import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;

public class Main {

    public Main() throws Exception
    {

        // 創建消費者,即消息接收者,並啟動線程
        QueueConsumer consumer = new QueueConsumer("queue");
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        // 創建生產者,即消息發送者
        Producer producer = new Producer("queue");

        // 循環發送消息
        for (int i = 0; i < 20; i++)
        {
            HashMap message = new HashMap();
            message.put("message number", i);
            producer.sendMessage(message);
            System.out.println("Message Number " + i + " sent.");
        }
    }

    /**
     * @param args
     * @throws SQLException
     * @throws IOException
     */
    public static void main(String[] args) throws Exception
    {
        new Main();
    }
}

運行結果:

Consumer amq.ctag-8TFduKUwrE1I8iT2L5DaZg registered
Message Number 0 sent.
Message Number 1 sent.
Message Number 2 sent.
Message Number 3 sent.
Message Number 4 sent.
Message Number 5 sent.
Message Number 6 sent.
Message Number 7 sent.
Message Number 8 sent.
Message Number 9 sent.
Message Number 10 sent.
Message Number 11 sent.
Message Number 12 sent.
Message Number 13 sent.
Message Number 14 sent.
Message Number 15 sent.
Message Number 16 sent.
Message Number 17 sent.
Message Number 18 sent.
Message Number 19 sent.
Message Number 0 received.
Message Number 1 received.
Message Number 2 received.
Message Number 3 received.
Message Number 4 received.
Message Number 5 received.
Message Number 6 received.
Message Number 7 received.
Message Number 8 received.
Message Number 9 received.
Message Number 10 received.
Message Number 11 received.
Message Number 12 received.
Message Number 13 received.
Message Number 14 received.
Message Number 15 received.
Message Number 16 received.
Message Number 17 received.
Message Number 18 received.
Message Number 19 received.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM