spring項目在啟動的時候執行方法初始化


說明:老項目,使用的是spring 3項目,需要對接RocketMQ,配置完之后,在消費者監聽方法中,發現業務處理service注入不進來,最后檢查發現是因為消費者監聽工具類沒有被正確的初始化,所以它里邊的業務service注入之后是個null,於是各種折騰,特此記錄一下

方式一:

解決:對需要初始化的類實現InitializingBean接口,重寫afterPropertiesSet()方法,在afterPropertiesSet方法中調用需要被初始化的方法

代碼如下:

import xx.xxx.component.BaseServiceMqConsumer;
import xx.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;


@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil implements InitializingBean {     

    private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

    @Autowired
    private VideoConsumerService videoConsumerService;

    /**
     * 接收消息
     */
    public void listener(){

        // 獲取消息生產者
        DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(RocketMqUtil.topic, "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    String msg = null;
                    try {
                        msg = new String(messageExt.getBody(),"utf-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody()));
                        e.printStackTrace();
                    }
                      log.info("消費開始-MsgBody:{}",msg);
//                    String msg = new String(messageExt.getBody());
//                    log.info("MsgBody:{}",new String(messageExt.getBody()));

                    if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
                        // topic的消費邏輯
                        if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
                            // 根據Tag消費消息,具體消費消息的業務方法
                            videoConsumerService.dealVideoMsg(msg);
                        }
                    } else if (messageExt.getTopic().equals("TopicTest2")) {
                        // 執行TopicTest2的消費邏輯
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
             */
            consumer.start();
            log.info("rocketmq-consumer 啟動成功---------------------------------------");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        listener();//調用需要被初始化的方法
    }
}

方式二:

使用注解@PostContruct 指定需要被初始化執行的方法

package net.greatsoft.xxx.utils;

import xxx.xxx.component.BaseServiceMqConsumer;
import net.greatsoft.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;


@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil  {

    private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

    @Autowired
    private VideoConsumerService videoConsumerService;

    /**
     * 接收消息8
     */
    @PostConstruct
    public void listener(){

        // 獲取消息生產者
        DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(RocketMqUtil.topic, "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    String msg = null;
                    try {
                        msg = new String(messageExt.getBody(),"utf-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody()));
                        e.printStackTrace();
                    }
                    log.info("消費開始-MsgBody:{}",msg);
                    if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
                        // topic的消費邏輯
                        if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
                            // 根據Tag消費消息,具體消費消息的業務方法
                            videoConsumerService.dealVideoMsg(msg);
                        }
                    } else if (messageExt.getTopic().equals("TopicTest2")) {
                        // 執行TopicTest2的消費邏輯
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
             */
            consumer.start();
            log.info("rocketmq-consumer 啟動成功---------------------------------------");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


}

方式三:

在spring的xml配置文件中使用 <Bean>的init 屬性來執行初始化的Bean

    <bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil"
    scope="singleton" init-method="listener"/>
package net.greatsoft.jinNanHealth.utils;

import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer;
import net.greatsoft.jinNanHealth.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author xc
 * @date 2020-07-23
 */
@DependsOn("RocketMqUtil")
@Component
public class RocketMqConsumerUtil  {

    private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

    @Autowired
    private VideoConsumerService videoConsumerService;

    /**
     * 接收消息8
     */
    public void listener(){

        // 獲取消息生產者
        DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(RocketMqUtil.topic, "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    String msg = null;
                    try {
                        msg = new String(messageExt.getBody(),"utf-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("消息編碼失敗,MsgBody:{}",new String(messageExt.getBody()));
                        e.printStackTrace();
                    }
             log.info("消費開始-MsgBody:{}",msg);
                    if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
                        // topic的消費邏輯
                        if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
                            // 根據Tag消費消息,具體消費消息的業務方法
                            videoConsumerService.dealVideoMsg(msg);
                        }
                    } else if (messageExt.getTopic().equals("TopicTest2")) {
                        // 執行TopicTest2的消費邏輯
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
             */
            consumer.start();
            log.info("rocketmq-consumer 啟動成功---------------------------------------");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM