RabbitMQ安裝以及java使用(二)


上一篇記錄了rabbitmq的安裝,這一篇記錄一下rabbitmq的java客戶端的簡單使用,當然在項目中我們有更為復雜的應用場景,這里只有最簡單的點對點生產者與消費者模式。

1、建立工程

首先建立一個簡單的maven工程,我這邊使用了平時使用的demo工程

pom.xml配置,本次案例中只需要兩個包即可,是用commons包的序列化,amqp則是rabbitmq的java包。

 


 

 

2、新建點對點抽象類

因為這個例子只講述非常簡單的點對點生產者與消費者關系,在某種程度上兩者有很多共性,所以這里干脆抽象成一個類了。具體代碼如下:

package ucs_test.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** 
* @author  作者 ucs_fuqing
* @date 創建時間:2017年8月11日 下午2:21:27 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public abstract class PointToPoint {
    protected Channel channel;
    protected Connection connection;
    protected String pointName;
    
    /**
     * 獲取一個隊列的連接
     * @param pointName
     * @throws IOException
     */
    public PointToPoint(String pointName) throws IOException{
        this.pointName = pointName;
        
        //創建連接工廠
        ConnectionFactory cf = new ConnectionFactory();
        
        //設置rabbitmq服務器地址
        cf.setHost("192.168.149.133");
        
        //設置rabbitmq服務器用戶名
        cf.setUsername("hxb");
        
        //設置rabbitmq服務器密碼
        cf.setPassword("hxb");
        
        //獲取一個新的連接
        connection = cf.newConnection();
        
        //創建一個通道
        channel = connection.createChannel();
        
        //申明一個隊列,如果這個隊列不存在,將會被創建
        channel.queueDeclare(pointName, false, false, false, null);
    }
    
    
    /**
     * 
    * @Title: close 
    * @Description: 其實在程序完成時一般會自動關閉連接,但是這里提供手動操作的入口, 
    * @param @throws IOException    設定文件 
    * @return void    返回類型 
    * @throws
     */
    public void close() throws IOException{
        this.channel.close();
        this.connection.close();
    }
}

在上面代碼中,實現的是創建一個隊列或者關閉它,在默認的情況下channel和connection會自動關閉,但是我覺得還是提供手動關閉的入口更好一些。

 


 

 

3、生產者

這個例子中的生產者其實非常簡單,我們創建了一個連接,並且獲取了通道,接下來就可以直接往我們指定的隊列(queue)中發送消息了,如果這個隊列不存在,則會被程序自動創建。

package ucs_test.rabbitmq;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;

import com.mchange.io.SerializableUtils;

/** 
* @author  作者 ucs_fuqing
* @date 創建時間:2017年8月11日 下午2:33:13 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class Producer extends PointToPoint{

    public Producer(String pointName) throws IOException {
        super(pointName);
        // TODO Auto-generated constructor stub
    }
    
    /**
     * 
    * @Title: sendMessage 
    * @Description: 生產消息
    * @param @param Object
    * @param @throws IOException    設定文件 
    * @return void    返回類型 
    * @throws
     */
    public void sendMessage(Serializable Object) throws IOException{
        channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object));
    }
}

上面代碼看到,我們只是簡單的向pointName的隊列發送了一個對象。

 


 

 

4、消費者

我們這里的消費者也非常簡單,僅僅只是拿到並打印出消息即可

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

/** 
* @author  作者 ucs_fuqing
* @date 創建時間:2017年8月11日 下午2:39:51 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class QueueConsumer extends PointToPoint implements Runnable,Consumer{

    public QueueConsumer(String pointName) throws IOException {
        super(pointName);
        // TODO Auto-generated constructor stub
    }
    
    public void run(){
        try {
            channel.basicConsume(pointName,true,this);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        // TODO Auto-generated method stub
        System.out.println("Consumer "+consumerTag +" registered");
        
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException {
        // TODO Auto-generated method stub
        Map map = (HashMap)SerializationUtils.deserialize(body);
        System.out.println("Message Number "+ map.get("tagId") + " received.");
        //channel.basicAck(env.getDeliveryTag(), false);
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        // TODO Auto-generated method stub
        
    }

}

以上代碼中,我們指定了消費的隊列,並從中拿到消息,打印出來。

 


 

 

5、測試類

至此我們的生產者與消費者都寫完了,接着寫個測試類來驗證一下

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;

/** 
* @author  作者 ucs_fuqing
* @date 創建時間:2017年8月11日 下午2:44:59 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class MainTest {
    public MainTest() throws IOException{
        QueueConsumer consumer = new QueueConsumer("testqueue");
        Thread cuThread = new Thread(consumer);
        
        cuThread.start();
        
        Producer producer = new Producer("testqueue");
        int i = 0;
        while (i<10000) {
            HashMap<String, Object> hm = new HashMap<>();
            hm.put("tagId", i);
            producer.sendMessage(hm);
            System.out.println("發送第"+i+"消息");
            i++;
        }
    }
    
    public static void main(String[] args) throws IOException {
        new MainTest();
    }
}

在這里我們的生產者生產10000條消息,消費者拿到並打印出來。看看運行結果:

可以看到雖然有點亂序,但是10000條消息全部被消費完畢。

 


 

 

6、消息應答

在上面的例子中,我們的生產者只管發送消息,消費者只管消費消息,而RabbitMQ在上面的例子中,將消息交付給消費者之后,會從內存中移除掉這個消息。在正式的項目中,消費消息可能需要那么幾秒鍾,那么問題來了:如果我們拿到消息后需要進行更為復雜的業務處理,而這個業務處理失敗或者中斷了,那么意味着這條消息代表的工作並未完成,但是消息已經不存在了,我們會丟失掉正在處理的信息,也會丟失掉發給消費者但是並未被消費的消息。
現在我們使用兩個消費者來接受同一個隊列的消息,測試類如下:

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;

/** 
* @author  作者 ucs_fuqing
* @date 創建時間:2017年8月11日 下午2:44:59 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class MainTest {
    public MainTest() throws IOException{
        QueueConsumer consumer = new QueueConsumer("testqueue");
        Thread cuThread = new Thread(consumer);
        QueueConsumer consumer2 = new QueueConsumer("testqueue");
        Thread cuThread2 = new Thread(consumer2);
        cuThread.start();
        cuThread2.start();
        Producer producer = new Producer("testqueue");
        int i = 0;
        while (i<10000) {
            HashMap<String, Object> hm = new HashMap<>();
            hm.put("tagId", i);
            producer.sendMessage(hm);
            //System.out.println("發送第"+i+"消息");
            i++;
        }
    }
    
    public static void main(String[] args) throws IOException {
        new MainTest();
    }
}

在這種情況下,MQ將會均勻的將消息發送給兩個消費者消費,但是如果consumer2半路終止或者異常,那么將會導致我們的測試結果顯示接受到的消息少於10000條,消失的消息被異常的消費者吃掉了,而我們沒有任何辦法。。。

為了保證消息不會丟失,或者說肯定被消費,RabbitMQ支持消息應答模式,簡單的只需要修改兩個位置:

消費者類QueueConsumer中

設置basicConsume方法參數為false,打開消息應答

消費完成之后,向mq返回應答消息。

這樣,當消費者異常時,MQ沒有收到消費者消息應答,將會把消息發送給其他消費者,保證這條消息被消費掉。

 

OK,簡單的RabbitMQ服務器Java端例子就這樣了。下一篇會在此基礎上增加一些高級的應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM