什么是ACL?
簡稱訪問控制列表,涉及到用戶,資源,權限,角色。
- 用戶
用戶是訪問控制的基礎要素,也不難理解,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名、密碼。
- 資源
資源,需要保護的對象,在RocketMQ中,消息發送涉及的Topic、消息消費涉及的消費組,應該進行保護,故可以抽象成資源。
- 權限
針對資源,能進行的操作,
- 角色
RocketMQ中,只定義兩種角色:是否是管理員。
acl配置文件
空
表示不設置白名單,該條規則默認返回false。
“*”
表示全部匹配,該條規則直接返回true,將會阻斷其他規則的判斷,請慎重使用。
192.168.0.{100,101}
多地址配置模式,ip地址的最后一組,使用{},大括號中多個ip地址,用英文逗號(,)隔開。
192.168.1.100,192.168.2.100
直接使用,分隔,配置多個ip地址。
192.168.*.或192.168.100-200.10-20
每個IP段使用 "" 或"-"表示范圍。
globalWhiteRemoteAddresses: - 10.10.103.* - 192.168.0.* accounts: - accessKey: RocketMQ 登錄用戶名 secretKey: 12345678 登錄密碼 whiteRemoteAddress: 192.168.0.* 用戶級別的IP地址白名單 admin: false defaultTopicPerm: DENY DENY(拒絕)。 defaultGroupPerm: SUB 默認消費組權限,該值默認為DENY(拒絕),建議值為SUB。 topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
管理員權限:
UPDATE_AND_CREATE_TOPIC
更新或創建主題。
UPDATE_BROKER_CONFIG
更新Broker配置。
DELETE_TOPIC_IN_BROKER
刪除主題。
UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
更新或創建訂閱組信息。
DELETE_SUBSCRIPTIONGROUP
刪除訂閱組信息。
源碼分析
broker端
ACL配置在broker端,在BrokerController的initialAcl方法中,如下:
private void initialAcl() { // 判斷有沒有開啟ACL權限控制 if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; } // 加載權限驗證器 采用SPI的機制,在文件 "META-INF/service/org.apache.rocketmq.acl.AccessValidator"中指定了具體的驗證實現 List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; accessValidatorMap.put(validator.getClass(),validator); // 根據每一個驗證器創建一個RPCHook,在收到請求的時候進行驗證 this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
那么根據文件中的實現定義,使用類加載器去加載
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to * You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.rocketmq.broker.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; public class ServiceProvider { private final static Logger LOG = LoggerFactory .getLogger(ServiceProvider.class); /** * A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here. */ private static ClassLoader thisClassLoader; /** * JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>. */ public static final String TRANSACTION_SERVICE_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService"; public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"; public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook"; public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator"; static { thisClassLoader = getClassLoader(ServiceProvider.class); } /** * Returns a string that uniquely identifies the specified object, including its class. * <p> * The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString() method, but works even when the specified object's class has overidden the toString method. * * @param o may be null. * @return a string of form classname@hashcode, or "null" if param o is null. */ protected static String objectId(Object o) { if (o == null) { return "null"; } else { return o.getClass().getName() + "@" + System.identityHashCode(o); } } protected static ClassLoader getClassLoader(Class<?> clazz) { try { return clazz.getClassLoader(); } catch (SecurityException e) { LOG.error("Unable to get classloader for class {} due to security restrictions !", clazz, e.getMessage()); throw e; } } protected static ClassLoader getContextClassLoader() { ClassLoader classLoader = null; try { classLoader = Thread.currentThread().getContextClassLoader(); } catch (SecurityException ex) { /** * The getContextClassLoader() method throws SecurityException when the context * class loader isn't an ancestor of the calling class's class * loader, or if security permissions are restricted. */ } return classLoader; } protected static InputStream getResourceAsStream(ClassLoader loader, String name) { if (loader != null) { return loader.getResourceAsStream(name); } else { return ClassLoader.getSystemResourceAsStream(name); } } public static <T> List<T> load(String name, Class<?> clazz) { LOG.info("Looking for a resource file of name [{}] ...", name); List<T> services = new ArrayList<T>(); try { ArrayList<String> names = new ArrayList<String>(); final InputStream is = getResourceAsStream(getContextClassLoader(), name); if (is != null) { BufferedReader reader; try { reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); } catch (java.io.UnsupportedEncodingException e) { reader = new BufferedReader(new InputStreamReader(is)); } String serviceName = reader.readLine(); while (serviceName != null && !"".equals(serviceName)) { LOG.info( "Creating an instance as specified by file {} which was present in the path of the context classloader.", name); if (!names.contains(serviceName)) { names.add(serviceName); } services.add((T)initService(getContextClassLoader(), serviceName, clazz)); serviceName = reader.readLine(); } reader.close(); } else { // is == null LOG.warn("No resource file with name [{}] found.", name); } } catch (Exception e) { LOG.error("Error occured when looking for resource file " + name, e); } return services; } public static <T> T loadClass(String name, Class<?> clazz) { final InputStream is = getResourceAsStream(getContextClassLoader(), name); if (is != null) { BufferedReader reader; try { try { reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); } catch (java.io.UnsupportedEncodingException e) { reader = new BufferedReader(new InputStreamReader(is)); } String serviceName = reader.readLine(); reader.close(); if (serviceName != null && !"".equals(serviceName)) { return initService(getContextClassLoader(), serviceName, clazz); } else { LOG.warn("ServiceName is empty!"); return null; } } catch (Exception e) { LOG.warn("Error occurred when looking for resource file " + name, e); } } return null; } protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) { Class<?> serviceClazz = null; try { if (classLoader != null) { try { // Warning: must typecast here & allow exception to be generated/caught & recast properly serviceClazz = classLoader.loadClass(serviceName); // clazz 是否是 serviceClazz的父類 if (clazz.isAssignableFrom(serviceClazz)) { LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(), objectId(classLoader)); } else { // This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation. LOG.error( "Class {} loaded from classloader {} does not extend {} as loaded by this classloader.", new Object[] {serviceClazz.getName(), objectId(serviceClazz.getClassLoader()), clazz.getName()}); } // 實例化 return (T)serviceClazz.newInstance(); } catch (ClassNotFoundException ex) { if (classLoader == thisClassLoader) { // Nothing more to try, onwards. LOG.warn("Unable to locate any class {} via classloader", serviceName, objectId(classLoader)); throw ex; } // Ignore exception, continue } catch (NoClassDefFoundError e) { if (classLoader == thisClassLoader) { // Nothing more to try, onwards. LOG.warn( "Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.", serviceClazz, objectId(classLoader)); throw e; } // Ignore exception, continue } } } catch (Exception e) { LOG.error("Unable to init service.", e); } return (T)serviceClazz; } }
接下來看一下org.apache.rocketmq.acl.plain.PlainAccessValidator的parse方法和validate方法。
在初始化PlainAccessValidator的時候 會初始化 aclPlugEngine = new PlainPermissionManager(); 會去加載broker上面的acl yaml配置,已經監聽配置變化。
@Override public AccessResource parse(RemotingCommand request, String remoteAddr) { // 封裝了當前請求需要哪些資源,哪些權限 PlainAccessResource accessResource = new PlainAccessResource(); if (remoteAddr != null && remoteAddr.contains(":")) { accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':'))); } else { accessResource.setWhiteRemoteAddress(remoteAddr); } accessResource.setRequestCode(request.getCode()); if (request.getExtFields() == null) { // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern) // The following logic codes depend on the request's extFields not to be null. return accessResource; } accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN)); try { switch (request.getCode()) { case RequestCode.SEND_MESSAGE: // 寫消息 需要對topic有PUB權限 accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; case RequestCode.PULL_MESSAGE: // 拉消息需要對topic有SUB權限 accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; } } catch (Throwable t) { throw new AclException(t.getMessage(), t); } // Content SortedMap<String, String> map = new TreeMap<String, String>(); for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey()) && !MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } // 將extField中的數據和body組合在一起 accessResource.setContent(AclUtils.combineRequestContent(request, map)); return accessResource; }
public void validate(PlainAccessResource plainAccessResource) { // Check the global white remote addr // 全局地址白名單檢驗 for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { if (remoteAddressStrategy.match(plainAccessResource)) { return; } } //accessKey檢驗 if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } // Check the white addr for accesskey //獲取配置的ACL規則 PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); // 匹配remote 地址 if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } // Check the signature 簽名校驗 String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } // Check perm of each resource // 校驗各個權限 checkPerm(plainAccessResource, ownedAccess); }
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } if (ownedPermMap == null && ownedAccess.isAdmin()) { // If the ownedPermMap is null and it is an admin user, then return return; } for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); boolean isGroup = PlainAccessResource.isRetryTopic(resource); if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) { // Check the default perm byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } }
如果權限不通過,則拋出異常
