springboot整合RabbitMq


基本概念,包括有rabbitMq相關的一些簡單理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout的使用,消息回調、手動確認等。 

(但是關於rabbitMq的安裝,就不介紹了,可以參考此文檔 https://blog.csdn.net/rexueqingchun/article/details/103250554)

 

 

 1.添加依賴

<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

2.配置文件
spring:
# rabbitmq的配置
rabbitmq:
addresses: 127.0.0.1:5672
# 自己使用guest賬號在服務器上添加用戶和密碼,然后設置
username: admin
password: admin
# 消息傳送虛擬根路徑
virtual-host: /
# 鏈接超時時間10s
connection-timeout: 10000
#spring.rabbitmq.publisher-confirms=true 已取消的配置
#correlated 關聯的 simple簡單的 none不開啟確認機制
publisher-confirm-type: correlated
#開啟return確認機制
publisher-returns: true
#設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除
template:
mandatory: true

# ******************消費者端除了上邊的配置還需要增加的配置*******************************
# 設置消費端手動 ack 在接收消息的方法中必須有channel通道 參數
listener:
simple:
acknowledge-mode=manual: manual
# 消費端默認並發數
concurrency=10:
# 最大並發數
max-concurrency: 20
# 在單個請求中處理的消息個數,他應該大於等於事務數量(unack的最大數量)
prefetch: 1

3.RabbitConfig 主要是創建 消息隊列、交換機,並將消息隊列與交換機通過一定的路由規則綁定到一起。(如果在此不綁定,則可以在消費者方法上通過注釋綁定也可以)

package com.qh.chat.common.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqConfig {

//群主創建聊天用
@Bean
public Queue directQueue(){
return new Queue("queue-only");
}
//等待客服連接消息隊列
@Bean
public Queue waitQueue(){
return new Queue("wait.customer");
}

@Bean
public Queue beforeQueue(){return new Queue("before.message");}

//聊天消息隊列
@Bean
public Queue socketQueue(){
return new Queue("chat.socket");
}

//加好友的消息隊列
@Bean
public Queue friendQueue(){
return new Queue("add.friend");
}

//回復添加請求使用
@Bean
public Queue replyQueue(){
return new Queue("reply.apply");
}

//退出聊天室使用
@Bean
public Queue quitQueue(){
return new Queue("quit.room");
}


@Bean
public Queue testQueue(){
return new Queue("query.test");
}
//創建socket交換機.類型為topic

@Bean
public TopicExchange socketExchange(){
return new TopicExchange("exchange-socket");
}

//創建加好友的交換機 類型為topic
@Bean
public TopicExchange friendExchange(){
return new TopicExchange("exchange-friend");
}

/**
* 回復申請好友的消息隊列
* @return
*/
@Bean
public TopicExchange replyExchange(){
return new TopicExchange("exchange-reply");
}

/**
* 退出聊天室消息隊列
* @return
*/
@Bean
public TopicExchange quitRoomExchange(){
return new TopicExchange("exchange-quit");
}

@Bean
public TopicExchange beforeMsgExchange(){
return new TopicExchange("exchange-before");
}

//創建加好友的交換機 類型為topic
@Bean
public TopicExchange customExchange(){
return new TopicExchange("exchange-customer");
}
//創建交換機,類型為fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange-fan");
}

//創建headers類型的交換機

@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("exchange-headers");
}

//可以在此處直接綁定交換機和消息隊列及路由規則 也可以在消費者方法那里再綁定
@Bean
public Binding bindExchangeQueue1(Queue socketQueue,TopicExchange socketExchange){
return BindingBuilder.bind(socketQueue).to(socketExchange).with("chat.socket.#");
}

//將退出聊天室的消息隊列綁定退出的消息queue
@Bean
public Binding bindExchangeQueue(Queue quitQueue, TopicExchange quitRoomExchange){
return BindingBuilder.bind(quitQueue).to(quitRoomExchange).with("quit.room.#");
}

//將回復消息隊列和回復queue綁定到一起
@Bean
public Binding bindExchangeQueue2(Queue replyQueue,TopicExchange replyExchange){
return BindingBuilder.bind(replyQueue).to(replyExchange).with("reply.apply.#");
}

//綁定(處理客服聊天前客戶發送的消息)消息隊列和交換機
@Bean
public Binding bindExchangeQueue3(Queue beforeQueue,TopicExchange beforeMsgExchange){
return BindingBuilder.bind(beforeQueue).to(beforeMsgExchange).with("before.message.#");
}


//綁定headers 匹配條件是跟map中的key和value全都一樣才可以
// @Bean
// public Binding bindingHeadersExchange(Queue friendQueue, HeadersExchange headersExchange) {
// Map<String,Object> headerValues = new HashMap<>();
// headerValues.put("type", "cash");
// headerValues.put("aging", "fast");
// return BindingBuilder.bind(friendQueue).to(headersExchange).whereAll(headerValues).match();
// }

//綁定headers 匹配條件是跟map中的key和value鍵值對有一對完全一樣即可
// @Bean
// public Binding bindingHeadersExchange1(Queue testQueue, HeadersExchange headersExchange) {
// Map<String,Object> headerValues = new HashMap<>();
// headerValues.put("type", "cash");
// headerValues.put("aging", "fast");
// return BindingBuilder.bind(testQueue).to(headersExchange).whereAny(headerValues).match();
// }
}
4.創建生產者
package com.qh.chat.common.produce.impl;

import com.qh.chat.common.entity.SocketMessage;
import com.qh.chat.common.produce.ProduceManage;
import com.qh.chat.system.entity.Users;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("produceService")
public class ProduceService implements ProduceManage {


@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 一對一通信,用戶群主創建聊天室
*/
public void directCreateRoom(SocketMessage socketMessage){
rabbitTemplate.convertAndSend("queue-only",socketMessage);
}
/**
* 聊天消息通信
* @param socketMessage
*/
@Override
public void chatMsg(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-socket","chat.socket.#",socketMessage);
}

/**
* 添加好友通信
* @param socketMessage
*/
@Override
public void addFriend(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-friend","add.friend.#",socketMessage);
}

/**
* 回復添加申請
* @param socketMessage
*/
@Override
public void replyAdd(SocketMessage socketMessage) {
rabbitTemplate.convertAndSend("exchange-reply","reply.apply.#",socketMessage);
}

/**
* 退出聊天室
* @param socketMessage
*/
public void quitRoom(SocketMessage socketMessage){
rabbitTemplate.convertAndSend("exchange-quit","quit.room.#",socketMessage);
}
/**
* 等待客服通信
* @param user
*/
@Override
public void waitCustomer(Users user) {
rabbitTemplate.convertAndSend("exchange-customer","wait.customer.#",user);
}

/**
* 客服通信前將客戶發送的消息先同步給客服
*/
public void sendMsgToCustomer(SocketMessage e) {
rabbitTemplate.convertAndSend("exchange-before","before.message.#",e);
}
}

5、在controller中調用生產者發布消息

 

 

 

 

6.消費者在方法上通過注釋獲取消息內容

第一個方法是消費者直接在方法上監聽消息,並且消息隊列通過特定的路由規則key
和交換機進行綁定
//添加好友消息同步方法
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value="add.friend",durable = "true") ,
exchange=@Exchange(name="exchange-friend",durable = "true",type = "topic"),
key = "add.friend.#"
))
@RabbitHandler
public void addFriend(@Payload SocketMessage socketMessage, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//獲取目標對象ID
String to=socketMessage.getReceiveId();
//獲取消息類型
String from =socketMessage.getSenderId();
//獲取聊天室房間號
String roomNo=socketMessage.getRoomNo();
String flag=socketMessage.getFlag();//通過flag判斷是添加好友還是聊天室
if(flag.equals(Const.ADD_FRIEND+"")){//申請添加好友
SocketIOClient targetClient = clients.get(to);
//根據消息類型確定發消息的事件
targetClient.sendEvent("notice",socketMessage);
}else if(flag.equals(Const.ADD_ROOM+"")){//說明是添加聊天室請求,
// roomNo不能為空,通過roomNo獲取群主Id
SocketIOClient targetClient = clients.get(to);
targetClient.sendEvent("notice",socketMessage);
}

Long deliveryTag=(Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工簽收消息后給消息隊列一個響應
channel.basicAck(deliveryTag,false);//false表示不支持批量簽收
}
第二個方法是之前已經將消息隊列通過具體的路由規則與交換機進行綁定,所以直接監聽特定的路由規則既可以獲取消息內容
/**
* 同步回復添加請求的消息
* @param socketMessage
* @param headers
* @param channel
*/
@RabbitListener(queues = "reply.apply")
@RabbitHandler
public void replyAdd(@Payload SocketMessage socketMessage,@Headers Map<String,Object> headers,Channel channel){
//處理接收到的消息
String receiveId=socketMessage.getReceiveId();//申請者ID
SocketIOClient targetClient = clients.get(receiveId);
targetClient.sendEvent("notice",socketMessage);
try {
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工簽收消息后給消息隊列一個響應
channel.basicAck(deliveryTag,false);//false表示不支持批量簽收
} catch (IOException e) {
e.printStackTrace();
}
}


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM