基本概念,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用,消息回调、手动确认等。
(但是关于rabbitMq的安装,就不介绍了,可以参考此文档 https://blog.csdn.net/rexueqingchun/article/details/103250554)
1.添加依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.配置文件
spring:
# rabbitmq的配置
rabbitmq:
addresses: 127.0.0.1:5672
# 自己使用guest账号在服务器上添加用户和密码,然后设置
username: admin
password: admin
# 消息传送虚拟根路径
virtual-host: /
# 链接超时时间10s
connection-timeout: 10000
#spring.rabbitmq.publisher-confirms=true 已取消的配置
#correlated 关联的 simple简单的 none不开启确认机制
publisher-confirm-type: correlated
#开启return确认机制
publisher-returns: true
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
template:
mandatory: true
# ******************消费者端除了上边的配置还需要增加的配置*******************************
# 设置消费端手动 ack 在接收消息的方法中必须有channel通道 参数
listener:
simple:
acknowledge-mode=manual: manual
# 消费端默认并发数
concurrency=10:
# 最大并发数
max-concurrency: 20
# 在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
prefetch: 1
3.RabbitConfig 主要是创建 消息队列、交换机,并将消息队列与交换机通过一定的路由规则绑定到一起。(如果在此不绑定,则可以在消费者方法上通过注释绑定也可以)
package com.qh.chat.common.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqConfig {
//群主创建聊天用
@Bean
public Queue directQueue(){
return new Queue("queue-only");
}
//等待客服连接消息队列
@Bean
public Queue waitQueue(){
return new Queue("wait.customer");
}
@Bean
public Queue beforeQueue(){return new Queue("before.message");}
//聊天消息队列
@Bean
public Queue socketQueue(){
return new Queue("chat.socket");
}
//加好友的消息队列
@Bean
public Queue friendQueue(){
return new Queue("add.friend");
}
//回复添加请求使用
@Bean
public Queue replyQueue(){
return new Queue("reply.apply");
}
//退出聊天室使用
@Bean
public Queue quitQueue(){
return new Queue("quit.room");
}
@Bean
public Queue testQueue(){
return new Queue("query.test");
}
//创建socket交换机.类型为topic
@Bean
public TopicExchange socketExchange(){
return new TopicExchange("exchange-socket");
}
//创建加好友的交换机 类型为topic
@Bean
public TopicExchange friendExchange(){
return new TopicExchange("exchange-friend");
}
/**
* 回复申请好友的消息队列
* @return
*/
@Bean
public TopicExchange replyExchange(){
return new TopicExchange("exchange-reply");
}
/**
* 退出聊天室消息队列
* @return
*/
@Bean
public TopicExchange quitRoomExchange(){
return new TopicExchange("exchange-quit");
}
@Bean
public TopicExchange beforeMsgExchange(){
return new TopicExchange("exchange-before");
}
//创建加好友的交换机 类型为topic
@Bean
public TopicExchange customExchange(){
return new TopicExchange("exchange-customer");
}
//创建交换机,类型为fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange-fan");
}
//创建headers类型的交换机
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("exchange-headers");
}
//可以在此处直接绑定交换机和消息队列及路由规则 也可以在消费者方法那里再绑定
@Bean
public Binding bindExchangeQueue1(Queue socketQueue,TopicExchange socketExchange){
return BindingBuilder.bind(socketQueue).to(socketExchange).with("chat.socket.#");
}
//将退出聊天室的消息队列绑定退出的消息queue
@Bean
public Binding bindExchangeQueue(Queue quitQueue, TopicExchange quitRoomExchange){
return BindingBuilder.bind(quitQueue).to(quitRoomExchange).with("quit.room.#");
}
//将回复消息队列和回复queue绑定到一起
@Bean
public Binding bindExchangeQueue2(Queue replyQueue,TopicExchange replyExchange){
return BindingBuilder.bind(replyQueue).to(replyExchange).with("reply.apply.#");
}
//绑定(处理客服聊天前客户发送的消息)消息队列和交换机
@Bean
public Binding bindExchangeQueue3(Queue beforeQueue,TopicExchange beforeMsgExchange){
return BindingBuilder.bind(beforeQueue).to(beforeMsgExchange).with("before.message.#");
}
//绑定headers 匹配条件是跟map中的key和value全都一样才可以
// @Bean
// public Binding bindingHeadersExchange(Queue friendQueue, HeadersExchange headersExchange) {
// Map<String,Object> headerValues = new HashMap<>();
// headerValues.put("type", "cash");
// headerValues.put("aging", "fast");
// return BindingBuilder.bind(friendQueue).to(headersExchange).whereAll(headerValues).match();
// }
//绑定headers 匹配条件是跟map中的key和value键值对有一对完全一样即可
// @Bean
// public Binding bindingHeadersExchange1(Queue testQueue, HeadersExchange headersExchange) {
// Map<String,Object> headerValues = new HashMap<>();
// headerValues.put("type", "cash");
// headerValues.put("aging", "fast");
// return BindingBuilder.bind(testQueue).to(headersExchange).whereAny(headerValues).match();
// }
}
4.创建生产者
package com.qh.chat.common.produce.impl;
import com.qh.chat.common.entity.SocketMessage;
import com.qh.chat.common.produce.ProduceManage;
import com.qh.chat.system.entity.Users;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("produceService")
public class ProduceService implements ProduceManage {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 一对一通信,用户群主创建聊天室
*/
public void directCreateRoom(SocketMessage socketMessage){
rabbitTemplate.convertAndSend("queue-only",socketMessage);
}
/**
* 聊天消息通信
* @param socketMessage
*/
@Override
public void chatMsg(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-socket","chat.socket.#",socketMessage);
}
/**
* 添加好友通信
* @param socketMessage
*/
@Override
public void addFriend(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-friend","add.friend.#",socketMessage);
}
/**
* 回复添加申请
* @param socketMessage
*/
@Override
public void replyAdd(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-reply","reply.apply.#",socketMessage);
}
/**
* 退出聊天室
* @param socketMessage
*/
public void quitRoom(SocketMessage socketMessage){
rabbitTemplate.convertAndSend("exchange-quit","quit.room.#",socketMessage);
}
/**
* 等待客服通信
* @param user
*/
@Override
public void waitCustomer(Users user) {
rabbitTemplate.convertAndSend("exchange-customer","wait.customer.#",user);
}
/**
* 客服通信前将客户发送的消息先同步给客服
*/
public void sendMsgToCustomer(SocketMessage e) {
rabbitTemplate.convertAndSend("exchange-before","before.message.#",e);
}
}
5、在controller中调用生产者发布消息
6.消费者在方法上通过注释获取消息内容
第一个方法是消费者直接在方法上监听消息,并且消息队列通过特定的路由规则key
和交换机进行绑定
//添加好友消息同步方法
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value="add.friend",durable = "true") ,
exchange=@Exchange(name="exchange-friend",durable = "true",type = "topic"),
key = "add.friend.#"
))
@RabbitHandler
public void addFriend(@Payload SocketMessage socketMessage, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//获取目标对象ID
String to=socketMessage.getReceiveId();
//获取消息类型
String from =socketMessage.getSenderId();
//获取聊天室房间号
String roomNo=socketMessage.getRoomNo();
String flag=socketMessage.getFlag();//通过flag判断是添加好友还是聊天室
if(flag.equals(Const.ADD_FRIEND+"")){//申请添加好友
SocketIOClient targetClient = clients.get(to);
//根据消息类型确定发消息的事件
targetClient.sendEvent("notice",socketMessage);
}else if(flag.equals(Const.ADD_ROOM+"")){//说明是添加聊天室请求,
// roomNo不能为空,通过roomNo获取群主Id
SocketIOClient targetClient = clients.get(to);
targetClient.sendEvent("notice",socketMessage);
}
Long deliveryTag=(Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工签收消息后给消息队列一个响应
channel.basicAck(deliveryTag,false);//false表示不支持批量签收
}
第二个方法是之前已经将消息队列通过具体的路由规则与交换机进行绑定,所以直接监听特定的路由规则既可以获取消息内容
/**
* 同步回复添加请求的消息
* @param socketMessage
* @param headers
* @param channel
*/
@RabbitListener(queues = "reply.apply")
@RabbitHandler
public void replyAdd(@Payload SocketMessage socketMessage,@Headers Map<String,Object> headers,Channel channel){
//处理接收到的消息
String receiveId=socketMessage.getReceiveId();//申请者ID
SocketIOClient targetClient = clients.get(receiveId);
targetClient.sendEvent("notice",socketMessage);
try {
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工签收消息后给消息队列一个响应
channel.basicAck(deliveryTag,false);//false表示不支持批量签收
} catch (IOException e) {
e.printStackTrace();
}
}