技術交流群:233513714
1 @Slf4j 2 @RestController 3 @Component 4 public class VouchersReceiverController implements MessageListener { 5 6 @Autowired 7 private VouchersService vouchersService; 8 9 String MerchantCode = PropertyReader.getValue("MerchantCode"); 10 11 /** 12 * 啟動監聽 13 */ 14 public void ReceiverVouchersStart() { 15 new ClassPathXmlApplicationContext("spring/rabbitmq-consumer-resources.xml");17 } 18 19 /** 20 * 監聽MQ消息隊列 21 * @param message 22 */ 23 public void onMessage(Message message) { 24 String phoneNo = ""; 25 String orderNo = ""; 26 String faceValue = ""; 27 String voucherNo = ""; 28 try { 29 String str = new String(message.getBody()); 30 log.info("監聽MQ中的信息{}" + str); 31 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 32 if ("000000".equals(vouchers.getRESPONSECODE())) { 33 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 34 for (VouchersResponseResult list : result) { 35 phoneNo = list.getLoginId(); 36 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 37 faceValue = list.getRebateAmt(); 38 voucherNo = list.getVoucherNo(); 39 } 40 } 41 } catch (Exception e) { 42 log.info("監聽出現異常{}" + e); 43 } 44 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 45 obj.setProductNo(phoneNo); 46 obj.setOrderTotalAmount(faceValue); 47 obj.setMerchantCode(MerchantCode); 48 obj.setOrderChannel("08"); 49 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 50 tInsuranceUserTicket.setOrderNo(orderNo); 51 tInsuranceUserTicket.setVoucherUseStatus("1"); 52 log.info("狀態修改參數:{}" + tInsuranceUserTicket); 53 vouchersService.updateUserTicketStatus(tInsuranceUserTicket); 54 log.info("狀態修改完成"); 55 } 56 }
以上代碼就是我們要展開討論的部分。當代碼發到服務器上我們向MQ中推一條消息后就會收到一個回調,消費這個回調信息的地方就在onMessage方法中,這就是這個MQ監聽的工作流程。正常情況下當監聽到消息后會消費下一條消息,如果沒有消息則不會再進行消費。然而上面的代碼會出現一個很奇怪的問題,當一條消息進來之后會出現重復消費的現象,查看服務日志之后發現程序在走到53行時不再向下執行而是又返回到28行執行,起初考慮是不是因為For循環造成了死循環,但是這個疑問很快被打消,如果是在For循環出現了死循環程序會打不到第30行的日志。
通過在網上谷歌,我將catch的位置進行了調整,代碼如下:
1 /** 2 * 監聽MQ消息隊列 3 * @param message 4 */ 5 public void onMessage(Message message) { 6 String phoneNo = ""; 7 String orderNo = ""; 8 String faceValue = ""; 9 String voucherNo = ""; 10 try { 11 String str = new String(message.getBody()); 12 log.info("監聽MQ中的信息{}" + str); 13 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 14 if ("000000".equals(vouchers.getRESPONSECODE())) { 15 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 16 for (VouchersResponseResult list : result) { 17 phoneNo = list.getLoginId(); 18 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 19 faceValue = list.getRebateAmt(); 20 voucherNo = list.getVoucherNo(); 21 } 22 } 23 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 24 obj.setProductNo(phoneNo); 25 obj.setOrderTotalAmount(faceValue); 26 obj.setMerchantCode(MerchantCode); 27 obj.setOrderChannel("08"); 28 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 29 tInsuranceUserTicket.setOrderNo(orderNo); 30 tInsuranceUserTicket.setVoucherUseStatus("1"); 31 log.info("狀態修改參數:{}" + tInsuranceUserTicket); 32 vouchersService.updateUserTicketStatus(tInsuranceUserTicket); 33 log.info("狀態修改完成"); 34 } catch (Exception e) { 35 log.info("監聽出現異常{}" + e); 36 } 37 }
以上就是調整后的代碼,死循環的問題終於被解決了。但是新的問題出現了,程序在走到32行時就會報出NullPointerException,沒猜錯這個是因為沒有注入VouchersService造成的,然后在選擇注入方式的時候發現RabbitMQ已經整合到spring中了。所以只能通過spring注入VouchersService注入了,經過試用發現構造方法的方式注入會出問題,所以最后選擇了屬性注入的方式。Controller層、Service層、配置文件依次如下:
Controller層:
1 @Slf4j 2 @RestController 3 @Component 4 public class VouchersReceiverController implements MessageListener { 5 6 private VouchersService vouchersService; 7 8 public VouchersService getVouchersService() { 9 return vouchersService; 10 } 11 12 public void setVouchersService(VouchersService vouchersService) { 13 this.vouchersService = vouchersService; 14 } 15 16 String MerchantCode = PropertyReader.getValue("MerchantCode"); 17 18 /** 19 * 啟動監聽 20 */ 21 public void ReceiverVouchersStart() { 22 new ClassPathXmlApplicationContext("spring/rabbitmq-consumer-resources.xml"); 23 log.info("保險前置開啟對代金券營銷平台的MQ監聽"); 24 } 25 26 /** 27 * 監聽MQ消息隊列 28 * @param message 29 */ 30 public void onMessage(Message message) { 31 String phoneNo = ""; 32 String orderNo = ""; 33 String faceValue = ""; 34 String voucherNo = ""; 35 try { 36 String str = new String(message.getBody()); 37 log.info("監聽MQ中的信息{}" + str); 38 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 39 if ("000000".equals(vouchers.getRESPONSECODE())) { 40 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 41 for (VouchersResponseResult list : result) { 42 phoneNo = list.getLoginId(); 43 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 44 faceValue = list.getRebateAmt(); 45 voucherNo = list.getVoucherNo(); 46 } 47 } 48 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 49 obj.setProductNo(phoneNo); 50 obj.setOrderTotalAmount(faceValue); 51 obj.setMerchantCode(MerchantCode); 52 obj.setOrderChannel("08"); 53 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 54 tInsuranceUserTicket.setOrderNo(orderNo); 55 tInsuranceUserTicket.setVoucherUseStatus("1"); 56 log.info("狀態修改參數:{}" + tInsuranceUserTicket); 57 getVouchersService().updateUserTicketStatus(tInsuranceUserTicket); 58 log.info("狀態修改完成"); 59 } catch (Exception e) { 60 log.info("監聽出現異常{}" + e); 61 } 62 } 63 }
Service層:
1 @Service 2 @Slf4j 3 @Component 4 public class VouchersService { 5 @Autowired 6 private TInsuranceUserTicketMapper tInsuranceUserTicketMapper; 7 8 public void updateUserTicketStatus(TInsuranceUserTicket tInsuranceUserTicket) { 9 tInsuranceUserTicket.setUpdatedTime(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())); 10 log.info("代金券使用狀態修改參數{}" + tInsuranceUserTicket); 11 try { 12 tInsuranceUserTicketMapper.updateUserTicketStatus(tInsuranceUserTicket); 13 log.info("代金券使用狀態已經修改{}" + tInsuranceUserTicket); 14 } catch (Exception e) { 15 log.info("代金券使用狀態修改出現異常{}" + e); 16 } 17 } 18 }
配置文件(rabbitmq-consumer-resources.xml):
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/rabbit 9 http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context.xsd"> 12 13 <context:property-placeholder location="classpath:/properties/test.properties"/> 14 15 <!-- 配置連接工廠 --> 16 <rabbit:connection-factory id="connectionFactory" 17 host="${rabbit.host}" 18 port="${rabbit.port}" 19 username="${rabbit.consumer.username}" 20 password="${rabbit.consumer.password}" 21 virtual-host="${rabbit.vhost}" 22 connection-factory="refConnectionFactory"/> 23 24 <!-- 配置心跳、超時、自動恢復--> 25 <bean id="refConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 26 <property name="requestedHeartbeat" value="240"></property> 27 <property name="connectionTimeout" value="10000"></property> 28 <property name="automaticRecoveryEnabled" value="true"></property> 29 </bean> 30 31 <!-- 配置消費者監聽器,指定隊列名及監聽類 --> 32 <rabbit:listener-container connection-factory="connectionFactory" prefetch="1" concurrency="10" acknowledge="auto"> 33 <rabbit:listener queue-names="${rabbit.queueName.reply}" ref="vouchersReceiverController"/> 34 </rabbit:listener-container> 35 36 <!-- 監聽類 --> 37 <bean id="vouchersReceiverController" class="com.bestpay.insurance.dal.controller.vouchers.VouchersReceiverController"> 38 <!--注入Service--> 39 <property name="vouchersService" ref="vouchersService"/> 40 </bean> 41 42 <bean id="vouchersService" class="com.bestpay.insurance.service.vouchers.VouchersService"/> 43 </beans>
經過了一番這樣配置之后項目再一次啟動起來,心中竊喜應該不會再有什么問題了吧。果然Service層可以注入進來了,並且程序順利的走入到了Service層的updateUserTicketStatus方法中。問題又一次出現了Service層中第10行的日志可以被打出來,但是之后又會報出NullPointerException的異常,這回問題顯而易見,應該是TInsuranceUserTicketMapper接口沒有被注入進來,考慮到mapper接口的注入方式是和一般的bean注入是不一樣的,所以經過了一番谷歌發現的確是不一樣的。以下給出調整后的Service層(Service注入mapper的方式依然采用屬性注入)、Mapper接口、調整后的配置文件(rabbitmq-consumer-resources.xml):
Service層:
1 @Service 2 @Slf4j 3 @Component 4 public class VouchersService { 5 6 private TInsuranceVouchersMapper tInsuranceVouchersMapper; 7 8 private TInsuranceUserTicketMapper tInsuranceUserTicketMapper; 9 10 public TInsuranceVouchersMapper getTInsuranceVouchersMapper() { 11 return tInsuranceVouchersMapper; 12 } 13 14 public void setTInsuranceVouchersMapper(TInsuranceVouchersMapper tInsuranceVouchersMapper) { 15 this.tInsuranceVouchersMapper = tInsuranceVouchersMapper; 16 } 17 18 public TInsuranceUserTicketMapper getTInsuranceUserTicketMapper() { 19 return tInsuranceUserTicketMapper; 20 } 21 22 public void setTInsuranceUserTicketMapper(TInsuranceUserTicketMapper tInsuranceUserTicketMapper) { 23 this.tInsuranceUserTicketMapper = tInsuranceUserTicketMapper; 24 } 25 26 public void updateUserTicketStatus(TInsuranceUserTicket tInsuranceUserTicket) { 27 tInsuranceUserTicket.setUpdatedTime(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())); 28 log.info("代金券使用狀態修改參數{}" + tInsuranceUserTicket); 29 try { 30 getTInsuranceUserTicketMapper().updateUserTicketStatus(tInsuranceUserTicket); 31 log.info("代金券使用狀態已經修改{}" + tInsuranceUserTicket); 32 } catch (Exception e) { 33 log.info("代金券使用狀態修改出現異常{}" + e); 34 } 35 } 36 }
Mapper接口:
1 @Component 2 public interface TInsuranceUserTicketMapper { 3 int updateUserTicketStatus(TInsuranceUserTicket record); 4 }
配置文件(rabbitmq-consumer-resources.xml):
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/rabbit 9 http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context.xsd"> 12 13 <context:property-placeholder location="classpath:/properties/test.properties"/> 14 15 <import resource="classpath:spring/spring-datasource.xml"/> 16 17 <!-- 配置連接工廠 --> 18 <rabbit:connection-factory id="connectionFactory" 19 host="${rabbit.host}" 20 port="${rabbit.port}" 21 username="${rabbit.consumer.username}" 22 password="${rabbit.consumer.password}" 23 virtual-host="${rabbit.vhost}" 24 connection-factory="refConnectionFactory"/> 25 26 <!-- 配置心跳、超時、自動恢復--> 27 <bean id="refConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 28 <property name="requestedHeartbeat" value="240"></property> 29 <property name="connectionTimeout" value="10000"></property> 30 <property name="automaticRecoveryEnabled" value="true"></property> 31 </bean> 32 33 <!-- 配置消費者監聽器,指定隊列名及監聽類 --> 34 <rabbit:listener-container connection-factory="connectionFactory" prefetch="1" concurrency="10" acknowledge="auto"> 35 <rabbit:listener queue-names="${rabbit.queueName.reply}" ref="vouchersReceiverController"/> 36 </rabbit:listener-container> 37 38 <!-- 監聽類 --> 39 <bean id="vouchersReceiverController" class="com.bestpay.insurance.dal.controller.vouchers.VouchersReceiverController"> 40 <!--注入Service--> 41 <property name="vouchersService" ref="vouchersService"/> 42 </bean> 43 44 <bean id="vouchersService" class="com.bestpay.insurance.service.vouchers.VouchersService"> 45 <!--注入Mapper--> 46 <property name="TInsuranceUserTicketMapper" ref="tInsuranceUserTicketMapper"/> 47 </bean> 48 49 <bean id="tInsuranceUserTicketMapper" class="org.mybatis.spring.mapper.MapperFactoryBean"> 50 <property name="mapperInterface" value="com.bestpay.insurance.dal.mapper.TInsuranceUserTicketMapper"/> 51 <property name="sqlSessionFactory" ref="sqlSessionFactory"/> 52 </bean> 53 54 <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> 55 <property name="dataSource" ref="anteaterDs"/> 56 <property name="configLocation" value="classpath:spring/mybatis.xml"/> 57 <property name="mapperLocations"> 58 <list> 59 <value>classpath:mapper/*Mapper.xml</value> 60 </list> 61 </property> 62 </bean> 63 64 <bean id="anteaterDs" class="org.apache.commons.dbcp.BasicDataSource"> 65 <property name="driverClassName" value="${jdbc.driverClassName}"/> 66 <property name="url" value="${jdbc.url}"/> 67 <property name="username" value="${jdbc.username}"/> 68 <property name="password" value="${jdbc.password}"/> 69 <property name="initialSize" value="${jdbc.initialSize}"/> 70 <property name="minIdle" value="${jdbc.minIdle}"/> 71 <property name="maxIdle" value="${jdbc.maxIdle}"/> 72 <property name="maxActive" value="${jdbc.maxActive}"/> 73 <property name="maxWait" value="${jdbc.maxWait}"/> 74 <property name="testOnBorrow" value="${jdbc.testOnBorrow}"/> 75 <property name="testWhileIdle" value="${jdbc.testWhileIdle}"/> 76 <property name="timeBetweenEvictionRunsMillis" value="${jdbc.timeBetweenEvictionRunsMillis}"/> 77 <property name="numTestsPerEvictionRun" value="${jdbc.numTestsPerEvictionRun}"/> 78 <property name="minEvictableIdleTimeMillis" value="${jdbc.minEvictableIdleTimeMillis}"/> 79 <property name="validationQuery" value="SELECT 1 FROM DUAL"/> 80 <property name="removeAbandonedTimeout" value="60"/> 81 <property name="removeAbandoned" value="true"/> 82 </bean> 83 </beans>
通過以上修改之后,再啟動項目之后發現問題已經解決。這個問題解決之后回過頭來細細想了一下為什么監聽的方法中不能注入Service,從而導致了一個死循環的假象,這個問題需要后續不斷深入的學習去探索問題的原因,但是還有一個問題可以道出問題的原委,就是監聽后不能注入。因為RabbitMQ被整合到了spring中,所以項目在啟動的時候會自動的注入RabbitMQ相關的東西,而如果采用@Autowired方式調用Service就會導致注入不進來,所以要在項目啟動的同時注入Service和Mapper,這樣就不會出現空指針的問題了。
