RabbitMQ 入門指南(Java)


RabbitMQ是一個受歡迎的消息代理,通常用於應用程序之間或者程序的不同組件之間通過消息來進行集成。本文簡單介紹了如何使用 RabbitMQ,假定你已經配置好了rabbitmq服務器。

RabbitMQ是用Erlang,對於主要的編程語言都有驅動或者客戶端。我們這里要用的是Java,所以先要獲得Java客戶端。。下面是Java客戶端的maven依賴的配置。

<dependency>

        <groupId>com.rabbitmq</groupId>

        <artifactId>amqp-client</artifactId>

        <version>3.0.4</version>

</dependency>

RabbitMQ這樣的消息代理可用來模擬不同的場景,例如點對點的消息分發或者訂閱/推送。我們的程序足夠簡單,有兩個基本的組件,一個生產者用於產生消息,還有一個消費者用來使用產生的消息。

在這個例子里,生產者會產生大量的消息,每個消息帶有一個序列號,另一個線程中的消費者會使用這些消息。

抽象類EndPoint:

我們首先寫一個類,將產生產者和消費者統一為 EndPoint類型的隊列。不管是生產者還是消費者, 連接隊列的代碼都是一樣的,這樣可以通用一些

package co.syntx.examples.rabbitmq;

 

import java.io.IOException;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

 

/**

 * Represents a connection with a queue

 * @author syntx

 *

 */

public abstract class EndPoint{

     

    protected Channel channel;

    protected Connection connection;

    protected String endPointName;

     

    public EndPoint(String endpointName) throws IOException{

         this.endPointName = endpointName;

         

         //Create a connection factory

         ConnectionFactory factory = new ConnectionFactory();

         

         //hostname of your rabbitmq server

         factory.setHost("localhost");

         

         //getting a connection

         connection = factory.newConnection();

         

         //creating a channel

         channel = connection.createChannel();

         

         //declaring a queue for this channel. If queue does not exist,

         //it will be created on the server.

         channel.queueDeclare(endpointName, false, false, false, null);

    }

     

     

    /**

     * 關閉channelconnection。並非必須,因為隱含是自動調用的。

     * @throws IOException

     */

     public void close() throws IOException{

         this.channel.close();

         this.connection.close();

     }

}

 

生產者:

生產者類的任務是向隊列里寫一條消息。我們使用Apache Commons Lang把可序列化的Java對象轉換成 byte 數組。commons langmaven依賴如下:

<dependency>

<groupId>commons-lang</groupId>

<artifactId>commons-lang</artifactId>

<version>2.6</version>

</dependency>

package co.syntx.examples.rabbitmq;

 

import java.io.IOException;

import java.io.Serializable;

 

import org.apache.commons.lang.SerializationUtils;

 

 

/**

 * The producer endpoint that writes to the queue.

 * @author syntx

 *

 */

public class Producer extends EndPoint{

     

    public Producer(String endPointName) throws IOException{

        super(endPointName);

    }

 

    public void sendMessage(Serializable object) throws IOException {

        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));

    }  

}

消費者:

消費者可以以線程方式運行,對於不同的事件有不同的回調函數,其中最主要的是處理新消息到來的事件。

package co.syntx.examples.rabbitmq;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.commons.lang.SerializationUtils;

 

import com.rabbitmq.client.AMQP.BasicProperties;

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.Envelope;

import com.rabbitmq.client.ShutdownSignalException;

 

 

/**

 * 讀取隊列的程序端,實現了Runnable接口。

 * @author syntx

 *

 */

public class QueueConsumer extends EndPoint implements Runnable, Consumer{

     

    public QueueConsumer(String endPointName) throws IOException{

        super(endPointName);       

    }

     

    public void run() {

        try {

            //start consuming messages. Auto acknowledge messages.

            channel.basicConsume(endPointName, true,this);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

 

    /**

     * Called when consumer is registered.

     */

    public void handleConsumeOk(String consumerTag) {

        System.out.println("Consumer "+consumerTag +" registered");    

    }

 

    /**

     * Called when new message is available.

     */

    public void handleDelivery(String consumerTag, Envelope env,

            BasicProperties props, byte[] body) throws IOException {

        Map map = (HashMap)SerializationUtils.deserialize(body);

        System.out.println("Message Number "+ map.get("message number") + " received.");

         

    }

 

    public void handleCancel(String consumerTag) {}

    public void handleCancelOk(String consumerTag) {}

    public void handleRecoverOk(String consumerTag) {}

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}

}

Putting it together:

在下面的測試類中,先運行一個消費者線程,然后開始產生大量的消息,這些消息會被消費者取走。

 

 

package co.syntx.examples.rabbitmq;

 

import java.io.IOException;

import java.sql.SQLException;

import java.util.HashMap;

 

public class Main {

    public Main() throws Exception{

         

        QueueConsumer consumer = new QueueConsumer("queue");

        Thread consumerThread = new Thread(consumer);

        consumerThread.start();

         

        Producer producer = new Producer("queue");

         

        for (int i = 0; i < 100000; i++) {

            HashMap message = new HashMap();

            message.put("message number", i);

            producer.sendMessage(message);

            System.out.println("Message Number "+ i +" sent.");

        }

    }

     

    /**

     * @param args

     * @throws SQLException

     * @throws IOException

     */

    public static void main(String[] args) throws Exception{

      new Main();

    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM