rabbitmq 自定義消費者,定向消費實現!


  因 mq server 就只有一台,而測試環境又是n套,並不像線上環境一樣,任意消費都是成立的。所以,需要進行定向消費功能開發!

如果讓自己來做mq的定向消費,應該怎么做?

  因為rabbitmq 是用 erlang 寫的,而它目前是沒有提供這種功能的,這種功能也多半只是在特殊的測試環境用得上!

  所以,想要改動 rabbitmq 的源碼支持,是不可能的了!

  所以,只能在消費端,spring 與 rabbitmq 集成的地方做動作了!

主要思路:

  1. 自己定制 mq-container, 定制化自己的參數;
  2. 設置消息的自動確認功能為關閉,即必須手動確認;
  3. 在消息消費的時候,針對規則做消息處理,如果有權限則進行處理,如果沒有權限,則直接丟棄該條消息,使mq中心重新投遞該消息即可;
  4. 定制 sping.handlers, 進行xml標簽解析;
  5. 定制 xxx.xsd xml 標簽定義配置文件,引入后可配置功能;
  6. 定義 container, 容納 listener ;
  7. 定義 listener, 動態生成處理消息類, 在 basicConsume({}) 進行處理;
  8. 在 basicConsume() 中進行權限管理, 在 beanClass 中進行業務處理; ()

public class SimpleMsgListenerContainer implements InitializingBean {
    private Logger logger = LoggerFactory.getLogger(SimpleMsgListenerContainer.class);

    private volatile Object messageListener;
    private volatile Queue queue;
    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

    private List<Integer> rejectMeassges = new ArrayList<>(REJECTMSG_LIST_SIZE);
    private Map<Integer, Integer> rejectedMsgMap = new WeakHashMap<>(REJECTMSG_LIST_SIZE);
    private int threadNum = 1;
    private int fetchCount = 1;
    private boolean autoAck = true;
    private static int REJECTMSG_LIST_SIZE = 100;
    private static int REJECT_WAIT_MS = 100;
    private static int ALLOWED_REJECTED_COUNT = 5;
    private static int REQUEUE_COUNT_LIMIT = 20;
    private static final String REQUEUE_COUNT = "Requeue-Count";

    @Autowired
    private MQConnection mqConnection;
    @Autowired
    private DecisionHandle decisionHandle;
    @Autowired
    private SubscribeCache subscribeCache;
    @Autowired
    private SystemConfigCache systemConfigCache;

    public interface ContainerDelegate {
        void invokeListener(Object object) throws Exception;
    }

    private void initializeConsumers() throws Exception {
        // 本地配置文件和管理平台元數據進行校驗
        ZkSubscribe zkSubscribe = subscribeCache.getZkSubscribe(queue.getName());
        checkMetadata(zkSubscribe);
        Connection connection = mqConnection.getConnection();
        // 創建queue、綁定queue
        MQBuilder.buildQueue(queue, connection);

        for (int i = 0; i < threadNum; i++) {
            final Channel channel = connection.createChannel();
            channel.basicQos(fetchCount);
            String consumerTag = CommonUtils.getConsumerTagPrefix() + "_" + channel.getChannelNumber();
            channel.basicConsume(queue.getName(), autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();
                    MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties(properties,
                            envelope, "UTF-8");
                    messageProperties.setMessageCount(0);
                    messageProperties.setConsumerTag(consumerTag);
                    messageProperties.setConsumerQueue(queue.getName());
                    Message payload = new Message(body, messageProperties);

                    String message = new String(body, "UTF-8");
                    LogbeanBuilder logbeanBuilder = LogbeanBuilder.build().addChannelNumber(channel.getChannelNumber())
                            .addQueue(queue).addLogType(Constants.LOGTYPE_RECEIVE);
                    if (!decisionHandle.isExecuteReceive(queue, String.valueOf(channel.getChannelNumber()),
                            messageProperties, message)) {// 沒有權限則拒絕
                        int hashCode = message.hashCode();
                        if (rejectedMsgMap.containsKey(hashCode)) {
                            int rejectedCount = rejectedMsgMap.get(hashCode);
                            rejectedMsgMap.put(hashCode, ++rejectedCount);
                        } else {
                            rejectedMsgMap.put(hashCode, 1);
                        }
                        if (!autoAck) {
                            int rejectWaitMs = 0;
                            int requeueCountLimit = 0;
                            try {
                                rejectWaitMs = systemConfigCache.getZkSystemConfig().getRejectWaitMs();
                                rejectWaitMs = rejectWaitMs == 0 ? REJECT_WAIT_MS : rejectWaitMs;
                                requeueCountLimit = systemConfigCache.getZkSystemConfig().getRequeueCountLimit();
                                requeueCountLimit = requeueCountLimit == 0 ? REQUEUE_COUNT_LIMIT : requeueCountLimit;
                                Thread.sleep(rejectWaitMs);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } finally {
                                Object requeueCountStr = messageProperties.getHeaders().get(REQUEUE_COUNT);
                                int requeueCount = requeueCountStr == null ? 0 : Integer.parseInt(String.valueOf(requeueCountStr));
                                /**
                                 * 如果重新入隊次數大於等於RE_QUEUE_COUNT,則直接丟棄該條消息
                                 */
                                if (requeueCount >= requeueCountLimit) {
                                    channel.basicAck(deliveryTag, false);
                                } else {
                                    if (rejectedMsgMap.get(hashCode) != null
                                            && rejectedMsgMap.get(hashCode) >= ALLOWED_REJECTED_COUNT) {
                                        try {
                                            if (channel.isOpen()) {
                                                if (requeueCount > 0) {
                                                    messageProperties.getHeaders().put(REQUEUE_COUNT, ++requeueCount);
                                                } else {
                                                    messageProperties.getHeaders().put(REQUEUE_COUNT, 1);
                                                }
                                                resetRoutePath();
                                                AMQP.BasicProperties newProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");
                                                channel.basicPublish("", queue.getName(), newProperties, body);
                                                rejectedMsgMap.remove(hashCode);
                                            }
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        } finally {
                                            channel.basicAck(deliveryTag, false);
                                        }
                                    } else {
                                        channel.basicReject(deliveryTag, true);
                                    }
                                }
                            }
                        }
                    } else {// 有權限才處理消息
                        rejectedMsgMap.clear();
                        MQResponse mqResponse = null;
                        try {
                            Object listener = getMessageListener();
                            if (listener instanceof MessageListener) {
                                // 調用業務實現類的包裝類, 再反射調用業務服務
                                Object result = ((MessageListener) listener).onMessage(payload);
                                if (result != null) {
                                    if (result instanceof MQResponse) {
                                        mqResponse = (MQResponse) result;
                                    }
                                } else {
                                    String errorMsg = "業務方法沒有返回數據";
                                    mqResponse = MQResponseBuilder.build().addAction(MQAction.SUCCESS)
                                            .addErrMsg(errorMsg);
                                }
                            }
                        } catch (Exception e) {
                            mqResponse = MQResponseBuilder.build().addAction(MQAction.FAILURE);
                            logbeanBuilder.addMessage("receive:" + message);
                            logger.error(logbeanBuilder.toString(), e);
                        } finally {
                            logbeanBuilder.addMessage("receive: [" + message + "] response:[" + mqResponse == null ? ""
                                    : mqResponse.toString() + "]");
                            MQAction action = mqResponse.getAction() == null ? MQAction.SUCCESS
                                    : mqResponse.getAction();
                            if (!autoAck) {
                                if (action == MQAction.SUCCESS) {
                                    channel.basicAck(deliveryTag, false);
                                } else if (action == MQAction.RETRY) {
                                    channel.basicNack(deliveryTag, false, true);
                                } else if (action == MQAction.FAILURE) {
                                    channel.basicNack(deliveryTag, false, false);
                                }
                            }
                        }
                    }
                }
            });
        }
    }

    public void checkMetadata(ZkSubscribe zkSubscribe) {
        LogbeanBuilder logbeanBuilder = LogbeanBuilder.build().addQueue(queue).addLogType(Constants.LOGTYPE_RECEIVE);
        ZkSystemConfig zkSystemConfig = systemConfigCache.getZkSystemConfig();
        boolean masterSwitch = false;
        if (zkSystemConfig != null) {
            masterSwitch = zkSystemConfig.getMasterSwitch().equalsIgnoreCase("true");
        }
        String message = "該產線應用沒有在治理平台做消費關系配置,請檢查!";
        if (masterSwitch) {// 總開關開啟
            if (zkSubscribe == null) {
                logger.error(message + logbeanBuilder.toString());
                throw new RuntimeException(message + logbeanBuilder.toString());
            }
            // 覆蓋本地配置
            this.autoAck = zkSubscribe.getAutoAck().equalsIgnoreCase("true") ? true : false;
        }
        if (zkSubscribe == null) {
            logbeanBuilder.addMessage(message);
            logger.warn(logbeanBuilder.toString());
        }
        if (zkSubscribe != null && StringUtils.isNotBlank(zkSubscribe.getFetchCount())) {
            this.fetchCount = Integer.parseInt(zkSubscribe.getFetchCount());
        }
        if (zkSubscribe != null && StringUtils.isNotBlank(zkSubscribe.getThreadNum())) {
            this.threadNum = Integer.parseInt(zkSubscribe.getThreadNum());
        }
        if (queue == null) {
            logger.error("消費者配置文件中mqItem不能為空");
            throw new RuntimeException("消費者配置文件中mqItem不能為空");
        }
        if (queue.getBindExchangeType() == null) {
            logger.error("消費者配置文件中exchangeType不能為空");
            throw new RuntimeException("消費者配置文件中exchangeType不能為空");
        }
        switch (queue.getBindExchangeType()) {
            case "fanout":
                checkExchang(logbeanBuilder, zkSubscribe, masterSwitch);
                checkQueue(logbeanBuilder, zkSubscribe, masterSwitch);
                break;
            case "direct":
                if (StringUtils.isBlank(queue.getBindExchangeName())) { // defaultExchange
                } else {
                    checkExchang(logbeanBuilder, zkSubscribe, masterSwitch);
                    checkRoutingKey(logbeanBuilder, zkSubscribe, masterSwitch);
                }
                checkQueue(logbeanBuilder, zkSubscribe, masterSwitch);
                break;
            case "topic":
                checkExchang(logbeanBuilder, zkSubscribe, masterSwitch);
                checkQueue(logbeanBuilder, zkSubscribe, masterSwitch);
                checkRoutingKey(logbeanBuilder, zkSubscribe, masterSwitch);
                break;
            case "headers":
                logger.error("暫不支持headers類型");
                throw new RuntimeException("暫不支持headers類型");
            default:
                logger.error("exchangeType格式配置錯誤");
                throw new RuntimeException("exchangeType格式配置錯誤");
        }
    }

    public void checkExchang(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) {
        if (StringUtils.isBlank(queue.getBindExchangeName())) {
            String message = "本地配置文件中exchange為空,請檢查!";
            logbeanBuilder.addMessage(message);
            logger.warn(logbeanBuilder.toString());
            throw new RuntimeException(message + logbeanBuilder.toString());
        } else {
            if (zkSubscribe != null && !queue.getBindExchangeName().equalsIgnoreCase(zkSubscribe.getExchangename())) {
                String message = "本地配置文件中exchange和管理系統不一致,請檢查!";
                logbeanBuilder.addMessage(message);
                logger.warn(logbeanBuilder.toString());
                if (masterSwitch) {
                    throw new RuntimeException(message + logbeanBuilder.toString());
                }
            }
        }
    }

    public void checkQueue(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) {
        if (StringUtils.isBlank(queue.getName())) {
            String message = "本地配置文件中queue為空,請檢查!";
            logbeanBuilder.addMessage(message);
            throw new RuntimeException(message + logbeanBuilder.toString());
        } else {
            if (zkSubscribe != null && !queue.getName().equalsIgnoreCase(zkSubscribe.getQueue())) {
                String message = "本地配置文件中queue和管理系統不一致,請檢查!";
                logbeanBuilder.addMessage(message);
                logger.warn(logbeanBuilder.toString());
                if (masterSwitch) {
                    throw new RuntimeException(message + logbeanBuilder.toString());
                }
            }
        }
    }

    public void checkRoutingKey(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) {
        if (StringUtils.isBlank(queue.getBindingKey())) {
            String message = "本地配置文件中binding-key不能為空,請檢查!";
            logbeanBuilder.addMessage(message);
            logger.warn(logbeanBuilder.toString());
            throw new RuntimeException(message + logbeanBuilder.toString());
        } else {
            if (zkSubscribe != null && !queue.getBindingKey().equalsIgnoreCase(zkSubscribe.getRoutingkey())) {
                String message = "本地配置文件中routingKey和管理系統不一致,請檢查!";
                logbeanBuilder.addMessage(message);
                logger.warn(logbeanBuilder.toString());
                if (masterSwitch) {
                    throw new RuntimeException(message + logbeanBuilder.toString());
                }
            }
        }
    }

    private void resetRoutePath() {
        MyContextBody contextBody = MyContextManager.get();
        contextBody = contextBody.resetRoutePath(contextBody);
        MyContextManager.set(contextBody);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initializeConsumers();
    }

    public void setMessageListener(Object messageListener) {
        this.messageListener = messageListener;
    }

    public Object getMessageListener() {
        return this.messageListener;
    }

    public Queue getQueue() {
        return queue;
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void setThreadNum(int threadNum) {
        this.threadNum = threadNum;
    }

    public void setFetchCount(int fetchCount) {
        this.fetchCount = fetchCount;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }

}

 

  // 自定義 sping.handlers 時的標簽解析

public class MyRabbitNamespaceHandler extends NamespaceHandlerSupport {

    public void init() {
        registerBeanDefinitionParser("queue", new QueueParser());
        registerBeanDefinitionParser("direct-exchange", new DirectExchangeParser());
        registerBeanDefinitionParser("topic-exchange", new TopicExchangeParser());
        registerBeanDefinitionParser("fanout-exchange", new FanoutExchangeParser());
//        registerBeanDefinitionParser("headers-exchange", new HeadersExchangeParser());
        registerBeanDefinitionParser("listener-container", new ListenerContainerParser());
//        registerBeanDefinitionParser("admin", new AdminParser());
//        registerBeanDefinitionParser("connection-factory", new ConnectionFactoryParser());
        registerBeanDefinitionParser("template", new TemplateParser());
//        registerBeanDefinitionParser("queue-arguments", new QueueArgumentsParser());
//        registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenParser());
    }

}


// 解析 container, 解析 listener, 至關重要

// listener.parse() 時做的動作, 解析出要處理的bean, 並構造出新的bean, 調用 basicConsume({}), 進行關聯

class ListenerContainerParser implements BeanDefinitionParser {

    private static final String LISTENER_ELEMENT = "listener";

    private static final String ID_ATTRIBUTE = "id";

    private static final String GROUP_ATTRIBUTE = "group";

    private static final String QUEUE_NAMES_ATTRIBUTE = "queue-names";

    private static final String QUEUES_ATTRIBUTE = "queue-ref";

    private static final String REF_ATTRIBUTE = "business-ref";

    private static final String METHOD_ATTRIBUTE = "method";

    private static final String MESSAGE_CONVERTER_ATTRIBUTE = "message-converter";

    private static final String RESPONSE_EXCHANGE_ATTRIBUTE = "response-exchange";

    private static final String RESPONSE_ROUTING_KEY_ATTRIBUTE = "response-routing-key";

    private static final String EXCLUSIVE = "exclusive";

    private static final String THREAD_NUM = "thread-num";
    private static final String FETCH_COUNT = "fetch-count";
    private static final String AUTO_ACK = "auto-ack";

    @SuppressWarnings("unchecked")
    @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(),
                parserContext.extractSource(element));
        parserContext.pushContainingComponent(compositeDef);

        String group = element.getAttribute(GROUP_ATTRIBUTE);
        ManagedList<RuntimeBeanReference> containerList = null;
        if (StringUtils.hasText(group)) {
            BeanDefinition groupDef;
            if (parserContext.getRegistry().containsBeanDefinition(group)) {
                groupDef = parserContext.getRegistry().getBeanDefinition(group);
            } else {
                BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ArrayList.class);
                builder.addConstructorArgValue(new ManagedList<RuntimeBeanReference>());
                groupDef = builder.getBeanDefinition();
                BeanDefinitionHolder holder = new BeanDefinitionHolder(groupDef, group);
                BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry());
            }
            ConstructorArgumentValues constructorArgumentValues = groupDef.getConstructorArgumentValues();
            if (!ArrayList.class.getName().equals(groupDef.getBeanClassName())
                    || constructorArgumentValues.getArgumentCount() != 1
                    || constructorArgumentValues.getIndexedArgumentValue(0, ManagedList.class) == null) {
                parserContext.getReaderContext().error("Unexpected configuration for bean " + group, element);
            }
            containerList = (ManagedList<RuntimeBeanReference>) constructorArgumentValues
                    .getIndexedArgumentValue(0, ManagedList.class).getValue();
        }

        // 循環解析 listener 標簽, 注入每個 queue 的處理邏輯
        List<Element> childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
        for (int i = 0; i < childElements.size(); i++) {
            parseListener(childElements.get(i), element, parserContext, containerList);
        }

        parserContext.popAndRegisterContainingComponent();
        return null;
    }

    private void parseListener(Element listenerEle, Element containerEle, ParserContext parserContext,
                               ManagedList<RuntimeBeanReference> containerList) {
        RootBeanDefinition listenerDef = new RootBeanDefinition();
        listenerDef.setSource(parserContext.extractSource(listenerEle));

        // 獲取 business-ref 配置的 bean, 作為一個最終調用時的依據
        String ref = listenerEle.getAttribute(REF_ATTRIBUTE);
        if (!StringUtils.hasText(ref)) {
            parserContext.getReaderContext().error("Listener 'ref' attribute contains empty value.", listenerEle);
        } else {
            listenerDef.getPropertyValues().add("delegate", new RuntimeBeanReference(ref));
        }

        String method = null;
        if (listenerEle.hasAttribute(METHOD_ATTRIBUTE)) {
            method = listenerEle.getAttribute(METHOD_ATTRIBUTE);
            if (!StringUtils.hasText(method)) {
                parserContext.getReaderContext()
                        .error("Listener 'method' attribute contains empty value.", listenerEle);
            }
        }
        listenerDef.getPropertyValues().add("defaultListenerMethod", method);

        if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) {
            String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE);
            if (!StringUtils.hasText(messageConverter)) {
                parserContext.getReaderContext().error(
                        "Listener container 'message-converter' attribute contains empty value.", containerEle);
            } else {
                listenerDef.getPropertyValues().add("messageConverter", new RuntimeBeanReference(messageConverter));
            }
        }

        //containerDef:構造SimpleMsgListenerContainer對象
        BeanDefinition containerDef = RabbitNamespaceUtils.parseContainer(containerEle, parserContext);

        // 都使用一個 MessageListenerAdapter 進行實現消費邏輯, 具體業務通過調用 properties 的 delegate 屬性, 進行調用業務;
        listenerDef.setBeanClass(MessageListenerAdapter.class);
        //將listener放入SimpleMsgListenerContainer對象
        containerDef.getPropertyValues().add("messageListener", listenerDef);

        String exclusive = listenerEle.getAttribute(EXCLUSIVE);
        if (StringUtils.hasText(exclusive)) {
            containerDef.getPropertyValues().add("exclusive", exclusive);
        }

        String childElementId = listenerEle.getAttribute(ID_ATTRIBUTE);
        String containerBeanName = StringUtils.hasText(childElementId) ? childElementId :
                BeanDefinitionReaderUtils.generateBeanName(containerDef, parserContext.getRegistry());

        String threadNum = listenerEle.getAttribute(THREAD_NUM);
        if (StringUtils.hasText(threadNum)) {
            containerDef.getPropertyValues().add("threadNum", threadNum);
        }

        String fetchCount = listenerEle.getAttribute(FETCH_COUNT);
        if (StringUtils.hasText(fetchCount)) {
            containerDef.getPropertyValues().add("fetchCount", fetchCount);
        }

        String autoAck = listenerEle.getAttribute(AUTO_ACK);
        if (StringUtils.hasText(autoAck)) {
            containerDef.getPropertyValues().add("autoAck", autoAck);
        }

        String queues = listenerEle.getAttribute(QUEUES_ATTRIBUTE);
        if (StringUtils.hasText(queues)) {
//            String[] names = StringUtils.commaDelimitedListToStringArray(queues);
//            List<RuntimeBeanReference> values = new ManagedList<RuntimeBeanReference>();
//            for (int i = 0; i < names.length; i++) {
//                values.add(new RuntimeBeanReference(names[i].trim()));
//            }
            containerDef.getPropertyValues().add("queue", new RuntimeBeanReference(queues));
        }


        // Register the listener and fire event
        parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName));

        if (containerList != null) {
            containerList.add(new RuntimeBeanReference(containerBeanName));
        }
    }

}

 

// 拒絕策略,自行實現邏輯即可

    // 拒絕策略,自行實現邏輯即可
    public boolean isExecuteReceive(Queue queue, String channelNumber, MessageProperties messageProperties,
                                    String message) {
        boolean result = false;
        boolean masterSwitchFlag = false;
        if (systemConfigCache.getZkSystemConfig() != null) {
            masterSwitchFlag = systemConfigCache.getZkSystemConfig().getMasterSwitch().equalsIgnoreCase("true");
        }

        if (!masterSwitchFlag) { // 總開關不開啟
            result = true;
        } else { // 總開關開啟
            // 審核標識
            boolean verifyFlag = false;
            // 斷開/連接標識
            boolean connectionFlag = false;
            // 訂閱組標識
            boolean subscribeGroupFlag = false;
            // 判斷本地xml和平台中配置是否一致
            verifyFlag = isVerifyFlag(queue);
            if (!verifyFlag) {
                logger.warn("消費者配置不正確,請檢查!{}", queue.toString());
                return false;
            }
            // // 判斷channel是否是連接狀態
            // connectionFlag = isConnectionFlag(channelNumber);
            // if (!connectionFlag) {
            // return false;
            // }
            // // 校驗容器信息和訂閱組信息
            // subscribeGroupFlag = isSubscribeGroupFlag(messageProperties, message);
            // if (!subscribeGroupFlag) {
            // return false;
            // }
            // result = masterSwitchFlag && verifyFlag && connectionFlag && subscribeGroupFlag;

            result = !systemConfigCache.isProdEnvironment() && verifyFlag && checkMessageRoute(messageProperties);
        }
        return result;
    }

 

使用樣例如下:

    
    <!-- 監聽類自由實現即可,將被反射調用 -->
    <rmq:queue id="test_mq" name="test_mq_name" bind-exchange-type="fanout" bind-exchange-name="test_mq_bind_exchange"/>
    <bean id="orderProcesser" class="com.xxx.rmq.consumer.OrderConsumer" />
    
    <!--消費者監聽容器,監聽到方法級別-->
    <rmq:listener-container id="myListener" message-converter="simpleMessageConverter">
        <!-- 必填項 queue-ref:依賴的queue  business-ref:業務處理類 method:業務方法 -->
        <!-- 可選項 auto-ack:自動回執,默認為true  fetch-count:批處理數,默認為1  thread-num:並發數,默認為1 -->
        <rmq:listener queue-ref="test_mq" business-ref="testMqHandler" method="processMessage" />
    </rmq:listener-container>

 

  一句話總結: 曲線救國!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM