說明:老項目,使用的是spring 3項目,需要對接RocketMQ,配置完之后,在消費者監聽方法中,發現業務處理service注入不進來,最后檢查發現是因為消費者監聽工具類沒有被正確的初始化,所以它里邊的業務service注入之后是個null,於是各種折騰,特此記錄一下
方式一:
解決:對需要初始化的類實現InitializingBean接口,重寫afterPropertiesSet()方法,在afterPropertiesSet方法中調用需要被初始化的方法
代碼如下:
import xx.xxx.component.BaseServiceMqConsumer; import xx.xxx.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @DependsOn("RocketMqConfig") @Component public class RocketMqConsumerUtil implements InitializingBean { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息 */ public void listener(){ // 獲取消息生產者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 訂閱主體 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消費開始-MsgBody:{}",msg); // String msg = new String(messageExt.getBody()); // log.info("MsgBody:{}",new String(messageExt.getBody())); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消費邏輯 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根據Tag消費消息,具體消費消息的業務方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 執行TopicTest2的消費邏輯 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 啟動成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void afterPropertiesSet() throws Exception { listener();//調用需要被初始化的方法 } }
方式二:
使用注解@PostContruct 指定需要被初始化執行的方法
package net.greatsoft.xxx.utils; import xxx.xxx.component.BaseServiceMqConsumer; import net.greatsoft.xxx.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; import java.util.List; @DependsOn("RocketMqConfig") @Component public class RocketMqConsumerUtil { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息8 */ @PostConstruct public void listener(){ // 獲取消息生產者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 訂閱主體 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消費開始-MsgBody:{}",msg); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消費邏輯 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根據Tag消費消息,具體消費消息的業務方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 執行TopicTest2的消費邏輯 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 啟動成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
方式三:
在spring的xml配置文件中使用 <Bean>的init 屬性來執行初始化的Bean
<bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil" scope="singleton" init-method="listener"/>
package net.greatsoft.jinNanHealth.utils; import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer; import net.greatsoft.jinNanHealth.service.VideoConsumerService; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author xc * @date 2020-07-23 */ @DependsOn("RocketMqUtil") @Component public class RocketMqConsumerUtil { private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收消息8 */ public void listener(){ // 獲取消息生產者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 訂閱主體 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消費開始-MsgBody:{}",msg); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消費邏輯 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根據Tag消費消息,具體消費消息的業務方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 執行TopicTest2的消費邏輯 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); log.info("rocketmq-consumer 啟動成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }