消息中间件实现不同系统之间通信的一个系统,就rabbitMQ来讲,消息的发出方将消息送入某个交换机,并且制定一个路由关键字,该交换机根据路由关键字将消息放入对应的队列中,然后一直监听着队列的程序便可以接收道相应的消息,并且根据预定的程序执行相应的逻辑。
下面通过一个例子来实现程序间的通信:
消息发出方:

package cn.ly; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 测试rabbitmq客户端,制造者端测试 * * 生产者发送信息的过程: * 1.创建连接 * 2.设置虚拟机 * 3.创建会话通道 * 4.声明队列 * 5.声明交换机 * 6.绑定交换机和队列,创建路由关键字 * 7.发送消息 * 8.关闭资源 * 注明:4-6这三步不是必须的,但是如果直接发送消息而没有队列的话程序会出错,所以,在发送之前先声明, * 同理,消费者端也是这样,需要先声明,没有的话就会创建,有的话就不发生什么; * * 对于工作模式: * 1.work queues:不使用交换机,只有一个队列,可以有多个消费端,队列通知采用轮询的方法给监听的多个消费端 * 发送消息; * 2.publish/subscribe:通过交换机进行消息转发,有多个队列,每个队列均可有一个或者多个消费者进行监听, * 每次生产者发送消息,则均由交换机转发至各个队列,由各个队列自行通知监听的消费者; * 3.Routing:模式同2,区别,为每个队列配备一个,或多个路由关键字,发送消息时指定路由关键字,由交换机根据路由 * 关键字匹配进行转发; * 4.Topics:模式同3,区别在于,配备的路由关键字可以为通配符的形式,通配符有:#和*,区别:#匹配任意个单词,而* * 只能匹配单个单词,其中路由关键字指定规则:多个单词使用.隔开; * 5.Header:模式同3,区别,匹配的是键值对; * 6.RPC:远程异步调用,mq的一个应用,有客户端和服务端,客户端向mq发送一个调用服务端的信息,服务端获得信息,调用 * 相应服务,将返回结果作为消息发送到另一个队列,客户端监听该队列获取返回信息; * * 对于消费者,每个消费者监听队列时指定的参数中,有队列名,是否自动回复,以及一个的map属性和一个回调方法; * 消费者,单独的消费者无法实现同时监听两个队列的操作; * 针对队列:可以给它增加路由key,也可以给一系列的键值对,作为头信息,这些东西都是对队列的标识,当生产者发送消息时, * 使用这些标识来决定将消息发送到那个队列,有路由key的话,优先匹配路由key; * 消费者的行为很单纯,就是监听一个队列,然后发现消息就回收; */ public class ProducerTest { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; // rabbitMq @Test public void run() { // 通过连接工厂创建新的连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟机 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 创建连接 connection = connectionFactory.newConnection(); // 创建会话通道 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); // 绑定交换机和队列 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS); for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms and email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
做的事情主要有:创建链接,设置虚拟机,创建会话通道,声明队列,声明交换机,通过关键字绑定交换机,发送消息,关闭链接;一个链接可以有多个会话通道;其中,声明交换机,声明队列和绑定这三件事不是必须做的,只是,初次运行发送消息时如果没有对应的交换机则会报错,另外,如果在创建交换机时报错,可以访问localhost:15672中,查看是否有同名的交换机,有的话删除即可;
消息接收方:

package cn.ly.cn.ly; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; public static void main(String[] args) throws IOException, TimeoutException { // 通过连接工厂创建新的连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟机 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; // 创建连接 connection = connectionFactory.newConnection(); // 创建会话通道 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); // 绑定交换机和队列 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
消息接收方主要是创建链接,声明回调方法,监听队列,同样,创建队列的操作不是必须的,但是,如果队列不存在,监听时会报错,所以提前创建;
需要注意的一点:同一个队列可以由多个进程同时监听,但是,同一条消息只能被一个进程接收,即监听同一队列的多个程序在一次消息发送中,只会有一个接收消息并处理;