rabbitMQ的簡單實例——amqp協議帶數據回寫機制


rabbitMQ是一種高性能的消息隊列,支持或者說它實現了AMQP協議(advanced message queue protocol高級消息隊列協議)。

下面簡單講一講一個小例子。我們首先要部署好rabbitMQ,然后實現一個生產者—消費者,生產者向rabbit中發布一個消息,消費者去rabbit取這個消息,在正確收到這個消息后,消費者會通過返回隊列回寫通知生產者自己收到了消息。

windows下部署rabbit非常簡單,先安裝erlang運行時,然后安裝rabbitMQ安裝文件即可,都是exe的,很簡單。然后找到rabbit的sbin目錄里的bat即可啟動rabbitMQ。

下面是producer—consumer代碼:

package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;

public class Producer {
    private final static String QUEUE_NAME = "myQueue"; //上送隊列
    
    public static void main(String[] args) throws IOException, TimeoutException{
        String replyQueueName = null;   //返回隊列名
        
        ConnectionFactory connFactory = null;
        Connection conn = null;
        Channel channel = null;
        try{
        connFactory = new ConnectionFactory();
        connFactory.setHost("localhost");
        conn = connFactory.newConnection();
        channel = conn.createChannel();
        //返回隊列
        replyQueueName = channel.queueDeclare().getQueue();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);

        String corrId = java.util.UUID.randomUUID().toString(); //用來表示返回隊列結果的id,唯一
        BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
        String msg = "linyang@hzfi.cn";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", QUEUE_NAME, props, msg.getBytes());
        System.out.println("producer has published: \"" + msg + "\"");
        
        while(true){
            Thread.sleep(1000);
            Delivery delivery = consumer.nextDelivery();
            System.out.println("from server reply:" + new String(delivery.getBody()));
        }
        }catch(IOException ioe){
            ioe.printStackTrace();
        }catch(TimeoutException toe){
            toe.printStackTrace();
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            if(channel!=null)   channel.close();
            if(conn!=null)  conn.close();
        }
    }
}
package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {
    private final static String QUEUE_NAME = "myQueue";
    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connFactory = null;
        Connection conn = null;
        Channel channel = null;
        try{
        connFactory = new ConnectionFactory();
        connFactory.setHost("localhost");
        conn = connFactory.newConnection();
        channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("listening for event message...");
        
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while(true){
            Thread.sleep(1000);
            Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties reply_props = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
            String msg = new String(delivery.getBody(),"utf-8");
            System.out.println("receive msg:" + msg);
            String retMsg = "ok, give you reply:" + new String(msg.getBytes(),"utf-8");
            System.out.println("Consumer中的返回隊列名" + props.getReplyTo());
            channel.basicPublish( "", props.getReplyTo(), reply_props, retMsg.getBytes());
        }
        }catch(IOException ioe){
            ioe.printStackTrace();
        }catch(TimeoutException toe){
            toe.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            if(channel!=null)   channel.close();
            if(conn!=null)  conn.close();
        }
    }
}

 開啟RabbitMQ的后台管理服務(是個web頁面)

     \sbin>rabbitmq-plugins enable rabbitmq_management

     訪問地址 http://localhost:15672/     id/psw: guest/guest

     可以對隊列,用戶,權限等進行管理,例如,默認情況下密碼是任意,如上代碼所示,ConnectionFactory僅僅設置了主機名,並未設置用戶名和密碼。

     我們可以新建或修改一個用戶名和密碼,如下圖:

這樣,我們上面的代碼也要做相應的調整:

ConnectionFactory connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
connFactory.setUsername("guest");
connFactory.setPassword("123");    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM