軟件開發的根本就是降低軟件開發的復雜性
采用可復用的軟件設計模型,采用合適的軟件架構搭建自己的系統。
消息隊列提供了一個異步通信協議,消息的發送者不用一直等待知道消息被成功的處理。消息被暫存於隊列中,對信息感興趣的消費者會訂閱消息,並處理他們。
使用消息隊列不是 殺雞用牛刀 ,而是一種未雨籌謀。隨着系統不斷升級,你將從中獲益。
消息隊列(MQ)使用消息將應用程序連接。通過 RabbitMQ 消息代理服務器在應用程序之間路由。
一種通用的軟件“總線”,解決應用程序之間繁重的消息通信工作
只要消息按照這些規則發布出去,任何消費者應用都能訂閱感興趣的消息。現在信息的生產者和消費者可以完全解耦合。
從任何發布者 到任何感興趣的消費者 之間的信息,通過一條軟件總線 動態的連接
Erlang 面向並發的編程語言
RabbitMQ 在應用程序和服務器之間 扮演着路由角色。
producer 創建消息,然后發布到代理服務器(RabbitMQ)

其實原理很簡單:
生產者創建消息,消費者接受消息。你的應用可以作為生產者,也可以作為消費者。
“信道”才能連接
重要的記者消費者和接受者是消息發送和接受概念的體現,而不是客戶端和服務端。
從底層開始構造,隊列
AMQP,即Advanced Message Queuing Protocol
高級消息隊列協議。
AMQP 消息路由必須有3個部分,交換器、隊列、綁定。
生產者把消息發布到交換器上,消息最終到達隊列,並被消費者接受。
綁定決定了消息如何從路由器到達特定的隊列。
通過AMQP 的 basic.consume 命令訂閱,會將信道置為接收模式,直到取消對隊列訂閱為止。
向隊列單條消息通過 AMQP 的 basic.get 命令實現。
消費者收到每一條消息都必須進行確認,消費者必須通過AMQP 的 basic.act 參數設置為 true 。
同時 RabbitMQ 才能安全的把消息從隊列中刪除。
如果應用程序有 bug 而忘記確認消息的話,Rabbit將不會給消費者發送更多的消息了。因為在上一條消息確認前,RabbitMQ 會認為這個消費者並沒有做好接受下一條消息。
實戰 rabbitMQ
一、MQ(消息隊列)
RabbitMQ(兔子消息隊列)
MQ 為 Message Queue ,消息隊列是應用程序和應用程序之間的通信方法。
RabbitMQ 是一個開源的,在 AMQP 基礎上完整的,可復用的企業消息系統。
AMQP
消息隊列的一個協議,類似 tcp 、udp
RabbitMQ 是實現 AMQP 協議的具體一個東西。
多種開發語言的支持,java python .net
其實就是一種驅動。
rabbit.com 官網
MQ的其他產品還有很多:
ActiveMQ 、 Kafka (分布式消息訂閱系統)等等
二、搭建 rabbitMQ 環境
首先要安裝 Erlang 環境,然后在這個基礎上才是
rabbitMQ 環境
用戶名、計算機名最好都是中文,不然可能安裝失敗。
安裝成功后 可以瀏覽器訪問
端口號: 15672
默認用戶 gust / gust
其實在 docker 上安裝鏡像就好了
進入界面后 ,注冊一個新號。
Demo/demo
然后新建一個映射路徑(virtual hosts)。 /demo
docker run -d --hostname localhost --name myrabbit -p 15672:15672 -p 5672:5672 rabbitmq
rabitmq 端口
5672 15672 25672
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq
端口號簡紹
5672 AMQP協議的端口
25672 集群端口
15672 管理界面的端口
Docker 啟動 rabbitMQ
1、啟動在 docker容器里
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
使用 docker logs 查看
docker logs some-rabbit
2、
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
3、
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
本地Mac 電腦啟動 rabbitMQ
1、進入文件夾
/usr/local/Cellar/rabbitmq/
cd 3.7.8
2、啟動服務
sbin/rabbitmq-server
3、瀏覽器訪問
瀏覽器
http://localhost:15672可進入rabbitmq控制終端登錄頁面
默認用戶名和密碼為
guest/guest.
三、簡單隊列
生產者將消息發送到隊列,消費者從隊列中獲取消息。
1、先創建 pom.xml 文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
</dependencies>
</project>
創建單獨的消息通道
package cn.itcast.rabbitmq.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置服務地址
factory.setHost("localhost”);
//端口
factory.setPort(5672);
//設置賬號信息,用戶名、密碼、vhost
factory.setVirtualHost("/taotao");
factory.setUsername("taotao");
factory.setPassword("taotao”);
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
創建生產者代碼
package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 聲明(創建)隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息內容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關閉通道和連接
channel.close();
connection.close();
}
}
創建消費者的代碼
package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
四、work模式
交給兩個對象,一個前台、一個后台。
創建生產者
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 消息內容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
創建消費者一:
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回確認狀態
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
創建消費者 2
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成狀態
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Work 模式的 能者多勞 模式
消息的確認模式
消息從隊列中獲取信息,服務端如何知道消息已經被消費呢?
看情況選擇合適的 模式。
模式1-自動模式
只要消息從隊列中獲取,無論取到后是否為成功消息,都認為消息是成功消息。
// 監聽隊列,手動返回完成狀態
channel.basicConsume(QUEUE_NAME,true, consumer);
模式2-手動模式
消費者從中獲取到消息后,服務器就會標記其為不可用狀態,等待消費者反饋,如果該消息一直沒有反饋,那么一直不可用。
// 監聽隊列,手動返回完成狀態
channel.basicConsume(QUEUE_NAME, false, consumer);
如果為 true ,就是自動模式
如果為 false ,就是手動模式,同時還要反饋信息。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
訂閱模式
一個生產者,多個消費者
每個消費者都有自己的一個隊列
生產者沒有將消息直接發送到隊列,而是發送到了交換機
每個隊列都要綁定交換機
生產者發送的消息,經過交換機。實現一個消息被多個消費者獲取的目的
后面的模式都是由前面的創新而來的。
路由模式
選擇性的接受數據、消息。