软件开发的根本就是降低软件开发的复杂性
采用可复用的软件设计模型,采用合适的软件架构搭建自己的系统。
消息队列提供了一个异步通信协议,消息的发送者不用一直等待知道消息被成功的处理。消息被暂存于队列中,对信息感兴趣的消费者会订阅消息,并处理他们。
使用消息队列不是 杀鸡用牛刀 ,而是一种未雨筹谋。随着系统不断升级,你将从中获益。
消息队列(MQ)使用消息将应用程序连接。通过 RabbitMQ 消息代理服务器在应用程序之间路由。
一种通用的软件“总线”,解决应用程序之间繁重的消息通信工作
只要消息按照这些规则发布出去,任何消费者应用都能订阅感兴趣的消息。现在信息的生产者和消费者可以完全解耦合。
从任何发布者 到任何感兴趣的消费者 之间的信息,通过一条软件总线 动态的连接
Erlang 面向并发的编程语言
RabbitMQ 在应用程序和服务器之间 扮演着路由角色。
producer 创建消息,然后发布到代理服务器(RabbitMQ)

其实原理很简单:
生产者创建消息,消费者接受消息。你的应用可以作为生产者,也可以作为消费者。
“信道”才能连接
重要的记者消费者和接受者是消息发送和接受概念的体现,而不是客户端和服务端。
从底层开始构造,队列
AMQP,即Advanced Message Queuing Protocol
高级消息队列协议。
AMQP 消息路由必须有3个部分,交换器、队列、绑定。
生产者把消息发布到交换器上,消息最终到达队列,并被消费者接受。
绑定决定了消息如何从路由器到达特定的队列。
通过AMQP 的 basic.consume 命令订阅,会将信道置为接收模式,直到取消对队列订阅为止。
向队列单条消息通过 AMQP 的 basic.get 命令实现。
消费者收到每一条消息都必须进行确认,消费者必须通过AMQP 的 basic.act 参数设置为 true 。
同时 RabbitMQ 才能安全的把消息从队列中删除。
如果应用程序有 bug 而忘记确认消息的话,Rabbit将不会给消费者发送更多的消息了。因为在上一条消息确认前,RabbitMQ 会认为这个消费者并没有做好接受下一条消息。
实战 rabbitMQ
一、MQ(消息队列)
RabbitMQ(兔子消息队列)
MQ 为 Message Queue ,消息队列是应用程序和应用程序之间的通信方法。
RabbitMQ 是一个开源的,在 AMQP 基础上完整的,可复用的企业消息系统。
AMQP
消息队列的一个协议,类似 tcp 、udp
RabbitMQ 是实现 AMQP 协议的具体一个东西。
多种开发语言的支持,java python .net
其实就是一种驱动。
rabbit.com 官网
MQ的其他产品还有很多:
ActiveMQ 、 Kafka (分布式消息订阅系统)等等
二、搭建 rabbitMQ 环境
首先要安装 Erlang 环境,然后在这个基础上才是
rabbitMQ 环境
用户名、计算机名最好都是中文,不然可能安装失败。
安装成功后 可以浏览器访问
端口号: 15672
默认用户 gust / gust
其实在 docker 上安装镜像就好了
进入界面后 ,注册一个新号。
Demo/demo
然后新建一个映射路径(virtual hosts)。 /demo
docker run -d --hostname localhost --name myrabbit -p 15672:15672 -p 5672:5672 rabbitmq
rabitmq 端口
5672 15672 25672
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq
端口号简绍
5672 AMQP协议的端口
25672 集群端口
15672 管理界面的端口
Docker 启动 rabbitMQ
1、启动在 docker容器里
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
使用 docker logs 查看
docker logs some-rabbit
2、
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
3、
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
本地Mac 电脑启动 rabbitMQ
1、进入文件夹
/usr/local/Cellar/rabbitmq/
cd 3.7.8
2、启动服务
sbin/rabbitmq-server
3、浏览器访问
浏览器
http://localhost:15672可进入rabbitmq控制终端登录页面
默认用户名和密码为
guest/guest.
三、简单队列
生产者将消息发送到队列,消费者从队列中获取消息。
1、先创建 pom.xml 文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
</dependencies>
</project>
创建单独的消息通道
package cn.itcast.rabbitmq.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost”);
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/taotao");
factory.setUsername("taotao");
factory.setPassword("taotao”);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
创建生产者代码
package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
创建消费者的代码
package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
四、work模式
交给两个对象,一个前台、一个后台。
创建生产者
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
创建消费者一:
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
创建消费者 2
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Work 模式的 能者多劳 模式
消息的确认模式
消息从队列中获取信息,服务端如何知道消息已经被消费呢?
看情况选择合适的 模式。
模式1-自动模式
只要消息从队列中获取,无论取到后是否为成功消息,都认为消息是成功消息。
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,true, consumer);
模式2-手动模式
消费者从中获取到消息后,服务器就会标记其为不可用状态,等待消费者反馈,如果该消息一直没有反馈,那么一直不可用。
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
如果为 true ,就是自动模式
如果为 false ,就是手动模式,同时还要反馈信息。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
订阅模式
一个生产者,多个消费者
每个消费者都有自己的一个队列
生产者没有将消息直接发送到队列,而是发送到了交换机
每个队列都要绑定交换机
生产者发送的消息,经过交换机。实现一个消息被多个消费者获取的目的
后面的模式都是由前面的创新而来的。
路由模式
选择性的接受数据、消息。