介紹
Aapche Ranger是以插件的形式集成到HDFS中,由Ranger Admin管理訪問策略,Ranger插件定期輪詢Admin更新策略到本地,並根據策略信息進行用戶訪問權限的判定。其中提供管理員管理策略、插件的Ranger web和Ranger Plugin,與Admin之間的通信是基於HTTP的RESTful架構。Ranger集成HDFS的架構圖如下:
Ranger對HDFS訪問控制的實現原理
HDFS本身是有訪問控制機制的,即在身份認證機制之后通過查詢ACLs來對用戶的權限檢查,該權限檢查的實現代碼是INodeAttributeProvider抽象類中接口AccessControlEnforcer的checkPermission方法。Ranger將HDFS NameNode 的Inode attribute的提供類即INodeAttributeProvider抽象類修改為Ranger自己寫的類,該類繼承了INodeAttributeProvider抽象類。即在hdfs-site.xml文件中修改如下配置項。
<name>dfs.namenode.inode.attributes.provider.class</name> <value>org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer</value>
Ranger 插件的初始化過程
動態加載類
在Namenode啟動過程中編譯RangerHdfsAuthorizerorizer類(包名為ranger-hdfs-plugin-shim),將RangerHdfs的相關類動態加載進虛擬機,並實例化具體實現類RangerHdfsAuthorizerorizer(包名為ranger-hdfs-plugin)。
1 rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass()); 2 @SuppressWarnings("unchecked") 3 Class<INodeAttributeProvider> cls = (Class<INodeAttributeProvider>) Class.forName(RANGER_HDFS_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader); 4 activatePluginClassLoader(); 5 rangerHdfsAuthorizerImpl = cls.newInstance();
插件初始化
初始化RangerPlugin,如上面的類圖可知,RangerHdfsPlugin是RangerBasePlugin類的子類,其具體的初始化是由父類的初始化方法來實現的。該方法主要完成了以下幾個功能:
(1)調用cleanup()方法,主要完成清空了refresher、serviceName、policyEngine這三個變量的值。
(2)讀取配置文件,並設置以下變量的初始值。
- serviceType:Ranger提供訪問控制服務的類型。
- serviceName:Ranger提供訪問控制服務的名稱。
- appId:由Ranger提供服務的組件ID。
- propertyPrefix:Ranger插件的屬性前綴。
- pollingIntervalMs:刷新器定期更新策略的輪詢間隔時間。Ranger 插件會定期從Ranger Admin拉取新的策略信息,並保存在Hdfs緩存中。
- cacheDir:從Ranger Admin拉取策略到Hdfs插件的臨時存放目錄。
(3)設置PangerPolicyEngineOptions類的成員變量值。
- evaluatorType:評估器的類型。在Ranger對Hdfs的訪問權限的鑒權階段需要策略評估器根據策略判斷是否具有訪問權限。
- cacheAuditResults:是否緩存審計。
- disableContextEnrichers:是否使用上下文增強器。
- disableCustomConditions:是否使用自定義條件。在Ranger0.5版本之后加入上下文增強器和用戶自定義條件這樣的“鈎子”函數以增加授權策略的可擴展性。
- disableTagPolicyEvaluation:是否使用基於標簽的策略評估。在Ranger0.6版本以后,Ranger不僅僅支持基於資源的策略,還支持基於標簽的策略,該策略的優點是資源分類與訪問授權的分離,標記的單個授權策略可用於授權跨各種Hadoop組件訪問資源。
(4)調用createAdminClient(),創建RangerAdmin與RangerPlugin通信的客戶端。這里使用的基於RESTful的通信風格,所以創建RangerAdminClient類的實例對象。
(5)創建PolicyRefresher類的對象,調用startRefresher()開啟策略刷新器,根據輪詢間隔時間定期從Ranger Admin 拉取更新的策略。
策略更新
Ranger插件更新策略的流程圖如下:
這部分主要講Ranger插件如何通過調用Ranger定義好的API獲取策略。這里使用了jersey來構建RESTful服務。
(1)Jersey客戶端的實現
RangerAdminRESTClient # getServicePoliciesIfUpdated()
1 if (isSecureMode) { 2 PrivilegedAction<ClientResponse> action = new PrivilegedAction<ClientResponse>() { 3 public ClientResponse run() { 4 // 創建WebResource實例,並根據URI和查詢參數(lastKnownVersion、pluginId)構建URL 5 WebResource secureWebResource = createWebResource(RangerRESTUtils.REST_URL_POLICY_GET_FOR_SECURE_SERVICE_IF_UPDATED + serviceName) 6 .queryParam(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion)) 7 .queryParam(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId); 8 // 發送GET請求,並且mime類型為Json 9 return secureWebResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class); 10 }; 11 }; 12 response = user.doAs(action); 13 }else{ 14 WebResource webResource = createWebResource(RangerRESTUtils.REST_URL_POLICY_GET_FOR_SERVICE_IF_UPDATED + serviceName) 15 .queryParam(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion)) 16 .queryParam(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId); 17 response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class); 18 } 19 if(response != null && response.getStatus() == 200) { //200:請求成功 20 ret = response.getEntity(ServicePolicies.class); 21 } else if(response != null && response.getStatus() == 304) { // 304:未修正 22 // no change 23 }
在RangerRESTUtils類中定義了參數的值:
public static final String REST_URL_POLICY_GET_FOR_SERVICE_IF_UPDATED = "/service/plugins/policies/download/"; public static final String SERVICE_NAME_PARAM = "serviceName"; public static final String LAST_KNOWN_TAG_VERSION_PARAM = "lastKnownVersion"; public static final String REST_MIME_TYPE_JSON = "application/json" ;
(2)服務端的實現
ServiceREST # getServicePolicesIfUpdate()
1 @GET 2 @Path("/policies/download/{serviceName}") 3 @Produces({ "application/json", "application/xml" }) 4 public ServicePolicies getServicePoliciesIfUpdated(@PathParam("serviceName") String serviceName, @QueryParam("lastKnownVersion") Long lastKnownVersion, @QueryParam("pluginId") String pluginId, @Context HttpServletRequest request) throws Exception { 5 ServicePolicies ret = null; 6 int httpCode = HttpServletResponse.SC_OK; 7 String logMsg = null; 8 RangerPerfTracer perf = null; 9 10 if (serviceUtil.isValidateHttpsAuthentication(serviceName, request)) { 11 if(lastKnownVersion == null) { 12 lastKnownVersion = Long.valueOf(-1); 13 } 14 try { 15 ServicePolicies servicePolicies = svcStore.getServicePoliciesIfUpdated(serviceName, lastKnownVersion); // 從數據庫獲取更新策略的過程 16 if(servicePolicies == null) { 17 httpCode = HttpServletResponse.SC_NOT_MODIFIED; 18 logMsg = "No change since last update"; 19 } else { 20 ret = filterServicePolicies(servicePolicies); 21 httpCode = HttpServletResponse.SC_OK; 22 logMsg = "Returning " + (ret.getPolicies() != null ? ret.getPolicies().size() : 0) + " policies. Policy version=" + ret.getPolicyVersion(); 23 } 24 } 25 } 26 return ret; 27 }