如果消費者 運行時候 報錯了
package com.toov5.msg.SMS; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues="fanout_sms_queue") public class SMSConsumer { @RabbitHandler public void process(String mString) { System.out.println("短信消費者獲取生產者消息msg"+mString); int i = 1/0; } }
當生產者投遞消息后:
消費者會不停的進行打印:
消息一直沒有被消費
原因 Rabbitmq 默認情況下 如果消費者程序出現異常情況 會自動實現補償機制 也就是 重試機制
@RabbitListener底層使用AOP進行攔截,如果程序沒有拋出異常,自動提交事務。 如果Aop使用異常通知 攔截獲取異常信息的話 , 自動實現補償機制,該消息會一直緩存在Rabbitmq服務器端進行重放,一直重試到不拋出異常為准。
可以修改重試策略
一般來說默認5s重試一次,
消費者配置:
listener:
simple:
retry:
####開啟消費者重試
enabled: true
####最大重試次數(默認無數次)
max-attempts: 5
####重試間隔次數
initial-interval: 3000
效果: 充實5次 不行就放棄了
MQ重試機制機制 需要注意的問題
如何合適選擇重試機制
情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試?
需要重試 別人的問題不是我自己的問題
情況2: 消費者獲取到消息后,拋出數據轉換異常,是否需要重試?
不需要重試 充實一億次也是如此 木有必要 需要發布版本解決
總結:
- 對於情況2,如果消費者代碼拋出異常是需要發布新版本才能解決的問題,那么不需要重試,重試也無濟於事。應該采用 日志記錄+定時任務job健康檢查+人工進行補償
- 把錯誤記錄在日志里面,通過定時Job去自動的補償,或通過人工去補償。
傳統的HTTP請求 如果失敗了沒法自動重試 ,當然自己可以寫個循環實現。MQ完全自己自帶的。
情況2的拓展延申:
將之前的案例改為 郵件消費者 調用郵件第三方接口
偽代碼:
在consumer 中 調用接口后 判斷返回值 由於RabbitMQ 在消費者異常時候 會進行重試機制 進行補償
所以可以拋出個異常 來實現
Consumer:
String result = template.Email();
if(result == null){
throw new Exception("調用第三方郵件服務器接口失敗!");
}
producer:
pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>rabbitmq_producer_springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project>
config:
package com.itmayiedu.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; //Fanout 類型 發布訂閱模式 @Component public class FanoutConfig { // 郵件隊列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信隊列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交換機 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定義郵件隊列 @Bean public Queue fanOutEamilQueue() { return new Queue(FANOUT_EMAIL_QUEUE); } // 2.定義短信隊列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定義交換機 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.隊列與交換機綁定郵件隊列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.隊列與交換機綁定短信隊列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } }
Producer:
package com.itmayiedu.rabbitmq; import java.util.UUID; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put("email", "xx@163.com"); jsonObject.put("timestamp", System.currentTimeMillis()); String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); // 設置消息唯一id 保證每次重試消息id唯一 /*Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build();*/ amqpTemplate.convertAndSend(queueName, jsonString); } }
Controller:
package com.itmayiedu.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.itmayiedu.rabbitmq.FanoutProducer; @RestController public class ProducerController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/sendFanout") public String sendFanout(String queueName) { fanoutProducer.send(queueName); return "success"; } }
yml:
spring: rabbitmq: ####連接地址 host: 192.168.91.6 ####端口號 port: 5672 ####賬號 username: admin ####密碼 password: admin ### 地址 virtual-host: /admin_toov5
啟動類:
package com.itmayiedu; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }
Consumer:
pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>rabbitmq_consumer_springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!-- springboot-web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project>
utils:
package com.itmayiedu.rabbitmq.utils; import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * HttpClient4.3工具類 * * @author hang.luo */ public class HttpClientUtils { private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志記錄 private static RequestConfig requestConfig = null; static { // 設置請求和傳輸超時時間 requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build(); } /** * post請求傳輸json參數 * * @param url * url地址 * @param json * 參數 * @return */ public static JSONObject httpPost(String url, JSONObject jsonParam) { // post請求返回結果 CloseableHttpClient httpClient = HttpClients.createDefault(); JSONObject jsonResult = null; HttpPost httpPost = new HttpPost(url); // 設置請求和傳輸超時時間 httpPost.setConfig(requestConfig); try { if (null != jsonParam) { // 解決中文亂碼問題 StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8"); entity.setContentEncoding("UTF-8"); entity.setContentType("application/json"); httpPost.setEntity(entity); } CloseableHttpResponse result = httpClient.execute(httpPost); // 請求發送成功,並得到響應 if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { String str = ""; try { // 讀取服務器返回過來的json字符串數據 str = EntityUtils.toString(result.getEntity(), "utf-8"); // 把json字符串轉換成json對象 jsonResult = JSONObject.parseObject(str); } catch (Exception e) { logger.error("post請求提交失敗:" + url, e); } } } catch (IOException e) { logger.error("post請求提交失敗:" + url, e); } finally { httpPost.releaseConnection(); } return jsonResult; } /** * post請求傳輸String參數 例如:name=Jack&sex=1&type=2 * Content-type:application/x-www-form-urlencoded * * @param url * url地址 * @param strParam * 參數 * @return */ public static JSONObject httpPost(String url, String strParam) { // post請求返回結果 CloseableHttpClient httpClient = HttpClients.createDefault(); JSONObject jsonResult = null; HttpPost httpPost = new HttpPost(url); httpPost.setConfig(requestConfig); try { if (null != strParam) { // 解決中文亂碼問題 StringEntity entity = new StringEntity(strParam, "utf-8"); entity.setContentEncoding("UTF-8"); entity.setContentType("application/x-www-form-urlencoded"); httpPost.setEntity(entity); } CloseableHttpResponse result = httpClient.execute(httpPost); // 請求發送成功,並得到響應 if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { String str = ""; try { // 讀取服務器返回過來的json字符串數據 str = EntityUtils.toString(result.getEntity(), "utf-8"); // 把json字符串轉換成json對象 jsonResult = JSONObject.parseObject(str); } catch (Exception e) { logger.error("post請求提交失敗:" + url, e); } } } catch (IOException e) { logger.error("post請求提交失敗:" + url, e); } finally { httpPost.releaseConnection(); } return jsonResult; } /** * 發送get請求 * * @param url * 路徑 * @return */ public static JSONObject httpGet(String url) { // get請求返回結果 JSONObject jsonResult = null; CloseableHttpClient client = HttpClients.createDefault(); // 發送get請求 HttpGet request = new HttpGet(url); request.setConfig(requestConfig); try { CloseableHttpResponse response = client.execute(request); // 請求發送成功,並得到響應 if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { // 讀取服務器返回過來的json字符串數據 HttpEntity entity = response.getEntity(); String strResult = EntityUtils.toString(entity, "utf-8"); // 把json字符串轉換成json對象 jsonResult = JSONObject.parseObject(strResult); } else { logger.error("get請求提交失敗:" + url); } } catch (IOException e) { logger.error("get請求提交失敗:" + url, e); } finally { request.releaseConnection(); } return jsonResult; } }
consumer:
package com.itmayiedu.rabbitmq; import java.util.Map; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.itmayiedu.rabbitmq.utils.HttpClientUtils; import com.rabbitmq.client.Channel; //郵件隊列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(String msg) throws Exception { System.out.println("郵件消費者獲取生產者消息msg:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 獲取email參數 String email = jsonObject.getString("email"); // 請求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因為網絡原因,造成無法訪問,繼續重試 throw new Exception("調用接口失敗!"); } System.out.println("執行結束...."); } }
yml:
spring:
rabbitmq:
####連接地址
host: 192.168.91.6
####端口號
port: 5672
####賬號
username: admin
####密碼
password: admin
### 地址
virtual-host: /admin_toov5
listener:
simple:
retry:
####開啟消費者異常重試
enabled: true
####最大重試次數
max-attempts: 5
####重試間隔次數
initial-interval: 2000
server:
port: 8081
啟動類:
package com.itmayiedu.rabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }
郵件服務器:
package com.mayikt.controller; import java.util.HashMap; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @RestController public class MsgController { // 模擬第三方發送郵件 @RequestMapping("/sendEmail") public Map<String, Object> sendEmail(String email) { System.out.println("開始發送郵件:" + email); Map<String, Object> result = new HashMap<String, Object>(); result.put("code", "200"); result.put("msg", "發送郵件成功.."); System.out.println("發送郵件成功"); return result; } public static void main(String[] args) { SpringApplication.run(MsgController.class, args); } }
yml:
server:
port: 8083
pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mayikt</groupId>
<artifactId>mayikt_sms</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web組件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
</project>
在沒有啟動郵件服務器時候,消費者調用接口失敗會一直重試,重試五次。
在此期間,如果啟動成功,則重試成功,不再重試, 不再進行補償機制。
消費者如果保證消息冪等性,不被重復消費
背景:
網絡延遲傳輸中,或者消費出現異常或者是消費延遲,會造成進行MQ重試進行重試補償機制,在重試過程中,可能會造成重復消費。
解決辦法:
使用全局MessageID判斷消費方使用同一個,解決冪等性。
只要重試過程中,判斷如果已經走完了 不能再繼續走 繼續執行了
MQ消費者的冪等行的解決 一般使用全局ID 或者寫個唯一標識比如時間戳 或者UUID 或者訂單號
改進:
producer:
添加:
// 設置消息唯一id 保證每次重試消息id唯一 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭里面 用UUID做全局ID amqpTemplate.convertAndSend(queueName, message);
全部代碼:
package com.itmayiedu.rabbitmq; import java.util.UUID; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put("email", "xx@163.com"); jsonObject.put("timestamp", System.currentTimeMillis()); String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); // 設置消息唯一id 保證每次重試消息id唯一 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭里面 用UUID做全局ID amqpTemplate.convertAndSend(queueName, message); } }
同樣的 消費者也需要修改:
方法參數類型為 Message 然后可以獲取這個ID 然后可以進行業務邏輯操作
@RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); //id獲取之 String msg = new String(message.getBody(), "UTF-8"); //消息內容獲取之 System.out.println("-----郵件消費者獲取生產者消息-----------------" + "messageId:" + messageId + ",消息內容:" + msg); if (messageId == null) { return; } JSONObject jsonObject = JSONObject.parseObject(msg); // 獲取email參數 String email = jsonObject.getString("email"); // 請求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因為網絡原因,造成無法訪問,繼續重試 throw new Exception("調用接口失敗!"); } System.out.println("執行結束...."); //messId 的情況寫入到redis 中 成功就修改為空 }
重試機制都是間隔性的 每次都是一個線程 單線程重試
關於應答模式:
Spring boot 中進行 AOP攔截 自動幫助做重試
手動應答的話 ,如果不告訴服務器已經消費成功,則服務器不會刪除 消息。告訴消費成功了才會刪除。
消費者的yml加入:
acknowledge-mode: manual
spring:
rabbitmq:
####連接地址
host: 192.168.91.6
####端口號
port: 5672
####賬號
username: admin
####密碼
password: admin
### 地址
virtual-host: /admin_toov5
listener:
simple:
retry:
####開啟消費者異常重試
enabled: true
####最大重試次數
max-attempts: 5
####重試間隔次數
initial-interval: 2000
####開啟手動ack
acknowledge-mode: manual
server:
port: 8081
開啟模式之后:
消費者參數需要加入: @Headers Map<String, Object> headers, Channel channel
代碼邏輯最后面加入:
// // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收 告訴RabbitMQ 消費成功了 消息可以刪除了
channel.basicAck(deliveryTag, false);
代碼如下:
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息" + "messageId:" + messageId + ",消息內容:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 獲取email參數 String email = jsonObject.getString("email"); // 請求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl); if (result == null) { // 因為網絡原因,造成無法訪問,繼續重試 throw new Exception("調用接口失敗!"); } // // 手動ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收 channel.basicAck(deliveryTag, false); System.out.println("執行結束...."); }