因 mq server 就只有一台,而測試環境又是n套,並不像線上環境一樣,任意消費都是成立的。所以,需要進行定向消費功能開發!
如果讓自己來做mq的定向消費,應該怎么做?
因為rabbitmq 是用 erlang 寫的,而它目前是沒有提供這種功能的,這種功能也多半只是在特殊的測試環境用得上!
所以,想要改動 rabbitmq 的源碼支持,是不可能的了!
所以,只能在消費端,spring 與 rabbitmq 集成的地方做動作了!
主要思路:
1. 自己定制 mq-container, 定制化自己的參數;
2. 設置消息的自動確認功能為關閉,即必須手動確認;
3. 在消息消費的時候,針對規則做消息處理,如果有權限則進行處理,如果沒有權限,則直接丟棄該條消息,使mq中心重新投遞該消息即可;
4. 定制 sping.handlers, 進行xml標簽解析;
5. 定制 xxx.xsd xml 標簽定義配置文件,引入后可配置功能;
6. 定義 container, 容納 listener ;
7. 定義 listener, 動態生成處理消息類, 在 basicConsume({}) 進行處理;
8. 在 basicConsume() 中進行權限管理, 在 beanClass 中進行業務處理; ()
public class SimpleMsgListenerContainer implements InitializingBean { private Logger logger = LoggerFactory.getLogger(SimpleMsgListenerContainer.class); private volatile Object messageListener; private volatile Queue queue; private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); private List<Integer> rejectMeassges = new ArrayList<>(REJECTMSG_LIST_SIZE); private Map<Integer, Integer> rejectedMsgMap = new WeakHashMap<>(REJECTMSG_LIST_SIZE); private int threadNum = 1; private int fetchCount = 1; private boolean autoAck = true; private static int REJECTMSG_LIST_SIZE = 100; private static int REJECT_WAIT_MS = 100; private static int ALLOWED_REJECTED_COUNT = 5; private static int REQUEUE_COUNT_LIMIT = 20; private static final String REQUEUE_COUNT = "Requeue-Count"; @Autowired private MQConnection mqConnection; @Autowired private DecisionHandle decisionHandle; @Autowired private SubscribeCache subscribeCache; @Autowired private SystemConfigCache systemConfigCache; public interface ContainerDelegate { void invokeListener(Object object) throws Exception; } private void initializeConsumers() throws Exception { // 本地配置文件和管理平台元數據進行校驗 ZkSubscribe zkSubscribe = subscribeCache.getZkSubscribe(queue.getName()); checkMetadata(zkSubscribe); Connection connection = mqConnection.getConnection(); // 創建queue、綁定queue MQBuilder.buildQueue(queue, connection); for (int i = 0; i < threadNum; i++) { final Channel channel = connection.createChannel(); channel.basicQos(fetchCount); String consumerTag = CommonUtils.getConsumerTagPrefix() + "_" + channel.getChannelNumber(); channel.basicConsume(queue.getName(), autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties(properties, envelope, "UTF-8"); messageProperties.setMessageCount(0); messageProperties.setConsumerTag(consumerTag); messageProperties.setConsumerQueue(queue.getName()); Message payload = new Message(body, messageProperties); String message = new String(body, "UTF-8"); LogbeanBuilder logbeanBuilder = LogbeanBuilder.build().addChannelNumber(channel.getChannelNumber()) .addQueue(queue).addLogType(Constants.LOGTYPE_RECEIVE); if (!decisionHandle.isExecuteReceive(queue, String.valueOf(channel.getChannelNumber()), messageProperties, message)) {// 沒有權限則拒絕 int hashCode = message.hashCode(); if (rejectedMsgMap.containsKey(hashCode)) { int rejectedCount = rejectedMsgMap.get(hashCode); rejectedMsgMap.put(hashCode, ++rejectedCount); } else { rejectedMsgMap.put(hashCode, 1); } if (!autoAck) { int rejectWaitMs = 0; int requeueCountLimit = 0; try { rejectWaitMs = systemConfigCache.getZkSystemConfig().getRejectWaitMs(); rejectWaitMs = rejectWaitMs == 0 ? REJECT_WAIT_MS : rejectWaitMs; requeueCountLimit = systemConfigCache.getZkSystemConfig().getRequeueCountLimit(); requeueCountLimit = requeueCountLimit == 0 ? REQUEUE_COUNT_LIMIT : requeueCountLimit; Thread.sleep(rejectWaitMs); } catch (InterruptedException e) { e.printStackTrace(); } finally { Object requeueCountStr = messageProperties.getHeaders().get(REQUEUE_COUNT); int requeueCount = requeueCountStr == null ? 0 : Integer.parseInt(String.valueOf(requeueCountStr)); /** * 如果重新入隊次數大於等於RE_QUEUE_COUNT,則直接丟棄該條消息 */ if (requeueCount >= requeueCountLimit) { channel.basicAck(deliveryTag, false); } else { if (rejectedMsgMap.get(hashCode) != null && rejectedMsgMap.get(hashCode) >= ALLOWED_REJECTED_COUNT) { try { if (channel.isOpen()) { if (requeueCount > 0) { messageProperties.getHeaders().put(REQUEUE_COUNT, ++requeueCount); } else { messageProperties.getHeaders().put(REQUEUE_COUNT, 1); } resetRoutePath(); AMQP.BasicProperties newProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8"); channel.basicPublish("", queue.getName(), newProperties, body); rejectedMsgMap.remove(hashCode); } } catch (Exception e) { e.printStackTrace(); } finally { channel.basicAck(deliveryTag, false); } } else { channel.basicReject(deliveryTag, true); } } } } } else {// 有權限才處理消息 rejectedMsgMap.clear(); MQResponse mqResponse = null; try { Object listener = getMessageListener(); if (listener instanceof MessageListener) { // 調用業務實現類的包裝類, 再反射調用業務服務 Object result = ((MessageListener) listener).onMessage(payload); if (result != null) { if (result instanceof MQResponse) { mqResponse = (MQResponse) result; } } else { String errorMsg = "業務方法沒有返回數據"; mqResponse = MQResponseBuilder.build().addAction(MQAction.SUCCESS) .addErrMsg(errorMsg); } } } catch (Exception e) { mqResponse = MQResponseBuilder.build().addAction(MQAction.FAILURE); logbeanBuilder.addMessage("receive:" + message); logger.error(logbeanBuilder.toString(), e); } finally { logbeanBuilder.addMessage("receive: [" + message + "] response:[" + mqResponse == null ? "" : mqResponse.toString() + "]"); MQAction action = mqResponse.getAction() == null ? MQAction.SUCCESS : mqResponse.getAction(); if (!autoAck) { if (action == MQAction.SUCCESS) { channel.basicAck(deliveryTag, false); } else if (action == MQAction.RETRY) { channel.basicNack(deliveryTag, false, true); } else if (action == MQAction.FAILURE) { channel.basicNack(deliveryTag, false, false); } } } } } }); } } public void checkMetadata(ZkSubscribe zkSubscribe) { LogbeanBuilder logbeanBuilder = LogbeanBuilder.build().addQueue(queue).addLogType(Constants.LOGTYPE_RECEIVE); ZkSystemConfig zkSystemConfig = systemConfigCache.getZkSystemConfig(); boolean masterSwitch = false; if (zkSystemConfig != null) { masterSwitch = zkSystemConfig.getMasterSwitch().equalsIgnoreCase("true"); } String message = "該產線應用沒有在治理平台做消費關系配置,請檢查!"; if (masterSwitch) {// 總開關開啟 if (zkSubscribe == null) { logger.error(message + logbeanBuilder.toString()); throw new RuntimeException(message + logbeanBuilder.toString()); } // 覆蓋本地配置 this.autoAck = zkSubscribe.getAutoAck().equalsIgnoreCase("true") ? true : false; } if (zkSubscribe == null) { logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); } if (zkSubscribe != null && StringUtils.isNotBlank(zkSubscribe.getFetchCount())) { this.fetchCount = Integer.parseInt(zkSubscribe.getFetchCount()); } if (zkSubscribe != null && StringUtils.isNotBlank(zkSubscribe.getThreadNum())) { this.threadNum = Integer.parseInt(zkSubscribe.getThreadNum()); } if (queue == null) { logger.error("消費者配置文件中mqItem不能為空"); throw new RuntimeException("消費者配置文件中mqItem不能為空"); } if (queue.getBindExchangeType() == null) { logger.error("消費者配置文件中exchangeType不能為空"); throw new RuntimeException("消費者配置文件中exchangeType不能為空"); } switch (queue.getBindExchangeType()) { case "fanout": checkExchang(logbeanBuilder, zkSubscribe, masterSwitch); checkQueue(logbeanBuilder, zkSubscribe, masterSwitch); break; case "direct": if (StringUtils.isBlank(queue.getBindExchangeName())) { // defaultExchange } else { checkExchang(logbeanBuilder, zkSubscribe, masterSwitch); checkRoutingKey(logbeanBuilder, zkSubscribe, masterSwitch); } checkQueue(logbeanBuilder, zkSubscribe, masterSwitch); break; case "topic": checkExchang(logbeanBuilder, zkSubscribe, masterSwitch); checkQueue(logbeanBuilder, zkSubscribe, masterSwitch); checkRoutingKey(logbeanBuilder, zkSubscribe, masterSwitch); break; case "headers": logger.error("暫不支持headers類型"); throw new RuntimeException("暫不支持headers類型"); default: logger.error("exchangeType格式配置錯誤"); throw new RuntimeException("exchangeType格式配置錯誤"); } } public void checkExchang(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) { if (StringUtils.isBlank(queue.getBindExchangeName())) { String message = "本地配置文件中exchange為空,請檢查!"; logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); throw new RuntimeException(message + logbeanBuilder.toString()); } else { if (zkSubscribe != null && !queue.getBindExchangeName().equalsIgnoreCase(zkSubscribe.getExchangename())) { String message = "本地配置文件中exchange和管理系統不一致,請檢查!"; logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); if (masterSwitch) { throw new RuntimeException(message + logbeanBuilder.toString()); } } } } public void checkQueue(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) { if (StringUtils.isBlank(queue.getName())) { String message = "本地配置文件中queue為空,請檢查!"; logbeanBuilder.addMessage(message); throw new RuntimeException(message + logbeanBuilder.toString()); } else { if (zkSubscribe != null && !queue.getName().equalsIgnoreCase(zkSubscribe.getQueue())) { String message = "本地配置文件中queue和管理系統不一致,請檢查!"; logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); if (masterSwitch) { throw new RuntimeException(message + logbeanBuilder.toString()); } } } } public void checkRoutingKey(LogbeanBuilder logbeanBuilder, ZkSubscribe zkSubscribe, boolean masterSwitch) { if (StringUtils.isBlank(queue.getBindingKey())) { String message = "本地配置文件中binding-key不能為空,請檢查!"; logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); throw new RuntimeException(message + logbeanBuilder.toString()); } else { if (zkSubscribe != null && !queue.getBindingKey().equalsIgnoreCase(zkSubscribe.getRoutingkey())) { String message = "本地配置文件中routingKey和管理系統不一致,請檢查!"; logbeanBuilder.addMessage(message); logger.warn(logbeanBuilder.toString()); if (masterSwitch) { throw new RuntimeException(message + logbeanBuilder.toString()); } } } } private void resetRoutePath() { MyContextBody contextBody = MyContextManager.get(); contextBody = contextBody.resetRoutePath(contextBody); MyContextManager.set(contextBody); } @Override public void afterPropertiesSet() throws Exception { initializeConsumers(); } public void setMessageListener(Object messageListener) { this.messageListener = messageListener; } public Object getMessageListener() { return this.messageListener; } public Queue getQueue() { return queue; } public void setQueue(Queue queue) { this.queue = queue; } public void setThreadNum(int threadNum) { this.threadNum = threadNum; } public void setFetchCount(int fetchCount) { this.fetchCount = fetchCount; } public void setAutoAck(boolean autoAck) { this.autoAck = autoAck; } }
// 自定義 sping.handlers 時的標簽解析
public class MyRabbitNamespaceHandler extends NamespaceHandlerSupport { public void init() { registerBeanDefinitionParser("queue", new QueueParser()); registerBeanDefinitionParser("direct-exchange", new DirectExchangeParser()); registerBeanDefinitionParser("topic-exchange", new TopicExchangeParser()); registerBeanDefinitionParser("fanout-exchange", new FanoutExchangeParser()); // registerBeanDefinitionParser("headers-exchange", new HeadersExchangeParser()); registerBeanDefinitionParser("listener-container", new ListenerContainerParser()); // registerBeanDefinitionParser("admin", new AdminParser()); // registerBeanDefinitionParser("connection-factory", new ConnectionFactoryParser()); registerBeanDefinitionParser("template", new TemplateParser()); // registerBeanDefinitionParser("queue-arguments", new QueueArgumentsParser()); // registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenParser()); } }
// 解析 container, 解析 listener, 至關重要
// listener.parse() 時做的動作, 解析出要處理的bean, 並構造出新的bean, 調用 basicConsume({}), 進行關聯
class ListenerContainerParser implements BeanDefinitionParser { private static final String LISTENER_ELEMENT = "listener"; private static final String ID_ATTRIBUTE = "id"; private static final String GROUP_ATTRIBUTE = "group"; private static final String QUEUE_NAMES_ATTRIBUTE = "queue-names"; private static final String QUEUES_ATTRIBUTE = "queue-ref"; private static final String REF_ATTRIBUTE = "business-ref"; private static final String METHOD_ATTRIBUTE = "method"; private static final String MESSAGE_CONVERTER_ATTRIBUTE = "message-converter"; private static final String RESPONSE_EXCHANGE_ATTRIBUTE = "response-exchange"; private static final String RESPONSE_ROUTING_KEY_ATTRIBUTE = "response-routing-key"; private static final String EXCLUSIVE = "exclusive"; private static final String THREAD_NUM = "thread-num"; private static final String FETCH_COUNT = "fetch-count"; private static final String AUTO_ACK = "auto-ack"; @SuppressWarnings("unchecked") @Override public BeanDefinition parse(Element element, ParserContext parserContext) { CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element)); parserContext.pushContainingComponent(compositeDef); String group = element.getAttribute(GROUP_ATTRIBUTE); ManagedList<RuntimeBeanReference> containerList = null; if (StringUtils.hasText(group)) { BeanDefinition groupDef; if (parserContext.getRegistry().containsBeanDefinition(group)) { groupDef = parserContext.getRegistry().getBeanDefinition(group); } else { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ArrayList.class); builder.addConstructorArgValue(new ManagedList<RuntimeBeanReference>()); groupDef = builder.getBeanDefinition(); BeanDefinitionHolder holder = new BeanDefinitionHolder(groupDef, group); BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry()); } ConstructorArgumentValues constructorArgumentValues = groupDef.getConstructorArgumentValues(); if (!ArrayList.class.getName().equals(groupDef.getBeanClassName()) || constructorArgumentValues.getArgumentCount() != 1 || constructorArgumentValues.getIndexedArgumentValue(0, ManagedList.class) == null) { parserContext.getReaderContext().error("Unexpected configuration for bean " + group, element); } containerList = (ManagedList<RuntimeBeanReference>) constructorArgumentValues .getIndexedArgumentValue(0, ManagedList.class).getValue(); } // 循環解析 listener 標簽, 注入每個 queue 的處理邏輯 List<Element> childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT); for (int i = 0; i < childElements.size(); i++) { parseListener(childElements.get(i), element, parserContext, containerList); } parserContext.popAndRegisterContainingComponent(); return null; } private void parseListener(Element listenerEle, Element containerEle, ParserContext parserContext, ManagedList<RuntimeBeanReference> containerList) { RootBeanDefinition listenerDef = new RootBeanDefinition(); listenerDef.setSource(parserContext.extractSource(listenerEle)); // 獲取 business-ref 配置的 bean, 作為一個最終調用時的依據 String ref = listenerEle.getAttribute(REF_ATTRIBUTE); if (!StringUtils.hasText(ref)) { parserContext.getReaderContext().error("Listener 'ref' attribute contains empty value.", listenerEle); } else { listenerDef.getPropertyValues().add("delegate", new RuntimeBeanReference(ref)); } String method = null; if (listenerEle.hasAttribute(METHOD_ATTRIBUTE)) { method = listenerEle.getAttribute(METHOD_ATTRIBUTE); if (!StringUtils.hasText(method)) { parserContext.getReaderContext() .error("Listener 'method' attribute contains empty value.", listenerEle); } } listenerDef.getPropertyValues().add("defaultListenerMethod", method); if (containerEle.hasAttribute(MESSAGE_CONVERTER_ATTRIBUTE)) { String messageConverter = containerEle.getAttribute(MESSAGE_CONVERTER_ATTRIBUTE); if (!StringUtils.hasText(messageConverter)) { parserContext.getReaderContext().error( "Listener container 'message-converter' attribute contains empty value.", containerEle); } else { listenerDef.getPropertyValues().add("messageConverter", new RuntimeBeanReference(messageConverter)); } } //containerDef:構造SimpleMsgListenerContainer對象 BeanDefinition containerDef = RabbitNamespaceUtils.parseContainer(containerEle, parserContext); // 都使用一個 MessageListenerAdapter 進行實現消費邏輯, 具體業務通過調用 properties 的 delegate 屬性, 進行調用業務; listenerDef.setBeanClass(MessageListenerAdapter.class); //將listener放入SimpleMsgListenerContainer對象 containerDef.getPropertyValues().add("messageListener", listenerDef); String exclusive = listenerEle.getAttribute(EXCLUSIVE); if (StringUtils.hasText(exclusive)) { containerDef.getPropertyValues().add("exclusive", exclusive); } String childElementId = listenerEle.getAttribute(ID_ATTRIBUTE); String containerBeanName = StringUtils.hasText(childElementId) ? childElementId : BeanDefinitionReaderUtils.generateBeanName(containerDef, parserContext.getRegistry()); String threadNum = listenerEle.getAttribute(THREAD_NUM); if (StringUtils.hasText(threadNum)) { containerDef.getPropertyValues().add("threadNum", threadNum); } String fetchCount = listenerEle.getAttribute(FETCH_COUNT); if (StringUtils.hasText(fetchCount)) { containerDef.getPropertyValues().add("fetchCount", fetchCount); } String autoAck = listenerEle.getAttribute(AUTO_ACK); if (StringUtils.hasText(autoAck)) { containerDef.getPropertyValues().add("autoAck", autoAck); } String queues = listenerEle.getAttribute(QUEUES_ATTRIBUTE); if (StringUtils.hasText(queues)) { // String[] names = StringUtils.commaDelimitedListToStringArray(queues); // List<RuntimeBeanReference> values = new ManagedList<RuntimeBeanReference>(); // for (int i = 0; i < names.length; i++) { // values.add(new RuntimeBeanReference(names[i].trim())); // } containerDef.getPropertyValues().add("queue", new RuntimeBeanReference(queues)); } // Register the listener and fire event parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName)); if (containerList != null) { containerList.add(new RuntimeBeanReference(containerBeanName)); } } }
// 拒絕策略,自行實現邏輯即可
// 拒絕策略,自行實現邏輯即可 public boolean isExecuteReceive(Queue queue, String channelNumber, MessageProperties messageProperties, String message) { boolean result = false; boolean masterSwitchFlag = false; if (systemConfigCache.getZkSystemConfig() != null) { masterSwitchFlag = systemConfigCache.getZkSystemConfig().getMasterSwitch().equalsIgnoreCase("true"); } if (!masterSwitchFlag) { // 總開關不開啟 result = true; } else { // 總開關開啟 // 審核標識 boolean verifyFlag = false; // 斷開/連接標識 boolean connectionFlag = false; // 訂閱組標識 boolean subscribeGroupFlag = false; // 判斷本地xml和平台中配置是否一致 verifyFlag = isVerifyFlag(queue); if (!verifyFlag) { logger.warn("消費者配置不正確,請檢查!{}", queue.toString()); return false; } // // 判斷channel是否是連接狀態 // connectionFlag = isConnectionFlag(channelNumber); // if (!connectionFlag) { // return false; // } // // 校驗容器信息和訂閱組信息 // subscribeGroupFlag = isSubscribeGroupFlag(messageProperties, message); // if (!subscribeGroupFlag) { // return false; // } // result = masterSwitchFlag && verifyFlag && connectionFlag && subscribeGroupFlag; result = !systemConfigCache.isProdEnvironment() && verifyFlag && checkMessageRoute(messageProperties); } return result; }
使用樣例如下:
<!-- 監聽類自由實現即可,將被反射調用 --> <rmq:queue id="test_mq" name="test_mq_name" bind-exchange-type="fanout" bind-exchange-name="test_mq_bind_exchange"/> <bean id="orderProcesser" class="com.xxx.rmq.consumer.OrderConsumer" /> <!--消費者監聽容器,監聽到方法級別--> <rmq:listener-container id="myListener" message-converter="simpleMessageConverter"> <!-- 必填項 queue-ref:依賴的queue business-ref:業務處理類 method:業務方法 --> <!-- 可選項 auto-ack:自動回執,默認為true fetch-count:批處理數,默認為1 thread-num:並發數,默認為1 --> <rmq:listener queue-ref="test_mq" business-ref="testMqHandler" method="processMessage" /> </rmq:listener-container>
一句話總結: 曲線救國!