[java学习笔记]rabbitMq的使用


  消息中间件实现不同系统之间通信的一个系统,就rabbitMQ来讲,消息的发出方将消息送入某个交换机,并且制定一个路由关键字,该交换机根据路由关键字将消息放入对应的队列中,然后一直监听着队列的程序便可以接收道相应的消息,并且根据预定的程序执行相应的逻辑。

  下面通过一个例子来实现程序间的通信:

消息发出方:

package cn.ly;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 测试rabbitmq客户端,制造者端测试
 *
 * 生产者发送信息的过程:
 * 1.创建连接
 * 2.设置虚拟机
 * 3.创建会话通道
 * 4.声明队列
 * 5.声明交换机
 * 6.绑定交换机和队列,创建路由关键字
 * 7.发送消息
 * 8.关闭资源
 * 注明:4-6这三步不是必须的,但是如果直接发送消息而没有队列的话程序会出错,所以,在发送之前先声明,
 * 同理,消费者端也是这样,需要先声明,没有的话就会创建,有的话就不发生什么;
 *
 * 对于工作模式:
 * 1.work queues:不使用交换机,只有一个队列,可以有多个消费端,队列通知采用轮询的方法给监听的多个消费端
 * 发送消息;
 * 2.publish/subscribe:通过交换机进行消息转发,有多个队列,每个队列均可有一个或者多个消费者进行监听,
 * 每次生产者发送消息,则均由交换机转发至各个队列,由各个队列自行通知监听的消费者;
 * 3.Routing:模式同2,区别,为每个队列配备一个,或多个路由关键字,发送消息时指定路由关键字,由交换机根据路由
 * 关键字匹配进行转发;
 * 4.Topics:模式同3,区别在于,配备的路由关键字可以为通配符的形式,通配符有:#和*,区别:#匹配任意个单词,而*
 * 只能匹配单个单词,其中路由关键字指定规则:多个单词使用.隔开;
 * 5.Header:模式同3,区别,匹配的是键值对;
 * 6.RPC:远程异步调用,mq的一个应用,有客户端和服务端,客户端向mq发送一个调用服务端的信息,服务端获得信息,调用
 * 相应服务,将返回结果作为消息发送到另一个队列,客户端监听该队列获取返回信息;
 *
 * 对于消费者,每个消费者监听队列时指定的参数中,有队列名,是否自动回复,以及一个的map属性和一个回调方法;
 * 消费者,单独的消费者无法实现同时监听两个队列的操作;
 * 针对队列:可以给它增加路由key,也可以给一系列的键值对,作为头信息,这些东西都是对队列的标识,当生产者发送消息时,
 * 使用这些标识来决定将消息发送到那个队列,有路由key的话,优先匹配路由key;
 * 消费者的行为很单纯,就是监听一个队列,然后发现消息就回收;
 */
public class ProducerTest {
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    // rabbitMq
    @Test
    public void run() {
        // 通过连接工厂创建新的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟机
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            // 创建连接
            connection = connectionFactory.newConnection();
            // 创建会话通道
            channel = connection.createChannel();
            //声明队列,如果队列在mq 中没有则要创建
            //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            // 绑定交换机和队列

            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);


            for(int i=0;i<5;i++){
                //发送消息的时候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //发送消息的时候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //发送消息的时候指定routingKey
                String message = "send sms and email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}
View Code

做的事情主要有:创建链接,设置虚拟机,创建会话通道,声明队列,声明交换机,通过关键字绑定交换机,发送消息,关闭链接;一个链接可以有多个会话通道;其中,声明交换机,声明队列和绑定这三件事不是必须做的,只是,初次运行发送消息时如果没有对应的交换机则会报错,另外,如果在创建交换机时报错,可以访问localhost:15672中,查看是否有同名的交换机,有的话删除即可;

消息接收方:

package cn.ly.cn.ly;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 通过连接工厂创建新的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 设置虚拟机
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        // 创建连接
        connection = connectionFactory.newConnection();
        // 创建会话通道
        channel = connection.createChannel();
        //声明队列,如果队列在mq 中没有则要创建
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //声明一个交换机
        //参数:String exchange, String type
        /**
         * 参数明细:
         * 1、交换机的名称
         * 2、交换机的类型
         * fanout:对应的rabbitmq的工作模式是 publish/subscribe
         * direct:对应的Routing    工作模式
         * topic:对应的Topics工作模式
         * headers: 对应的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        // 绑定交换机和队列

        //参数:String queue, String exchange, String routingKey
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、exchange 交换机名称
         * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //监听队列
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}
View Code

消息接收方主要是创建链接,声明回调方法,监听队列,同样,创建队列的操作不是必须的,但是,如果队列不存在,监听时会报错,所以提前创建;

需要注意的一点:同一个队列可以由多个进程同时监听,但是,同一条消息只能被一个进程接收,即监听同一队列的多个程序在一次消息发送中,只会有一个接收消息并处理;


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM