hadoop Yarn 編程API


客戶端編程庫:
所在jar包: org.apache.hadoop.yarn.client.YarnClient

使用方法:
1 定義一個YarnClient實例:
   private YarnClient client;
2 構造一個Yarn客戶端句柄並初始化
this.client = YarnClient.createYarnClient();
client.ini(conf)
3 啟動Yarn
yarnClient.start()
4 獲取一個新的application id
YarnClientApplication app=yarnClient.createApplication(); 注解:application id 封裝在YarnCLientApplication里面了。
5 構造ApplicationSubmissionContext, 用以提交作業
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
###################注解:一下會有很多set 屬性的東東“程序名稱,優先級,所在的隊列啊,###################################
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName(appName)
......
yarnClient.submitApplication(appContext);//將應用程序提交到ResouceManager上 這里通過步驟 2 中的conf 讀取yarn-site.xml

 
          
         


ApplicationMaster編程酷

設計思路:
為用戶暴露‘回調函數’用戶只需要實現這些回調函數,當某種事情發生時,會調用對應的(用戶實現的)回調函數
回調機制:
1 定義一個回調函數
2 提供函數實現的一方在初始化的時候,將回調函數的函數指針注冊給調用者
3 當特殊條件或事件發生的時候,調用者使用函數指針調用回調函數對事件進行處理
回調機制好處:
可以把調用者和被調用者分開,調用者不關心誰是調用者。它只需知道存在一個具有特定原型和限制條件的被調用函數。




 
         
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.client.api.async;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;

import com.google.common.annotations.VisibleForTesting;

/**
 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
 * and provides asynchronous updates on events such as container allocations and
 * completions.  It contains a thread that sends periodic heartbeats to the
 * ResourceManager.
 * 
 * It should be used by implementing a CallbackHandler:
 * <pre>
 * {@code
 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
 *   public void onContainersAllocated(List<Container> containers) {
 *     [run tasks on the containers]
 *   }
 *   
 *   public void onContainersCompleted(List<ContainerStatus> statuses) {
 *     [update progress, check whether app is done]
 *   }
 *   
 *   public void onNodesUpdated(List<NodeReport> updated) {}
 *   
 *   public void onReboot() {}
 * }
 * }
 * </pre>
 * 
 * The client's lifecycle should be managed similarly to the following:
 * 
 * <pre>
 * {@code
 * AMRMClientAsync asyncClient = 
 *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
 * asyncClient.init(conf);
 * asyncClient.start();
 * RegisterApplicationMasterResponse response = asyncClient
 *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
 *       appMasterTrackingUrl);
 * asyncClient.addContainerRequest(containerRequest);
 * [... wait for application to complete]
 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
 * asyncClient.stop();
 * }
 * </pre>
 */
@Public
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest> 
extends AbstractService {
  
  protected final AMRMClient<T> client;
  protected final CallbackHandler handler;
  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();

  public static <T extends ContainerRequest> AMRMClientAsync<T>
      createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
  }
  
  public static <T extends ContainerRequest> AMRMClientAsync<T>
      createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
          CallbackHandler callbackHandler) {
    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
  }
  
  protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
  }
  
  @Private
  @VisibleForTesting
  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
      CallbackHandler callbackHandler) {
    super(AMRMClientAsync.class.getName());
    this.client = client;
    this.heartbeatIntervalMs.set(intervalMs);
    this.handler = callbackHandler;
  }
    
  public void setHeartbeatInterval(int interval) {
    heartbeatIntervalMs.set(interval);
  }
  
  public abstract List<? extends Collection<T>> getMatchingRequests(
                                                   Priority priority, 
                                                   String resourceName, 
                                                   Resource capability);
  
  /**
   * Registers this application master with the resource manager. On successful
   * registration, starts the heartbeating thread.
   * @throws YarnException
   * @throws IOException
   */
  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
      String appHostName, int appHostPort, String appTrackingUrl)
      throws YarnException, IOException;

  /**
   * Unregister the application master. This must be called in the end.
   * @param appStatus Success/Failure status of the master
   * @param appMessage Diagnostics message on failure
   * @param appTrackingUrl New URL to get master info
   * @throws YarnException
   * @throws IOException
   */
  public abstract void unregisterApplicationMaster(
      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
  throws YarnException, IOException;

  /**
   * Request containers for resources before calling <code>allocate</code>
   * @param req Resource request
   */
  public abstract void addContainerRequest(T req);

  /**
   * Remove previous container request. The previous container request may have 
   * already been sent to the ResourceManager. So even after the remove request 
   * the app must be prepared to receive an allocation for the previous request 
   * even after the remove request
   * @param req Resource request
   */
  public abstract void removeContainerRequest(T req);

  /**
   * Release containers assigned by the Resource Manager. If the app cannot use
   * the container or wants to give up the container then it can release them.
   * The app needs to make new requests for the released resource capability if
   * it still needs it. eg. it released non-local resources
   * @param containerId
   */
  public abstract void releaseAssignedContainer(ContainerId containerId);

  /**
   * Get the currently available resources in the cluster.
   * A valid value is available after a call to allocate has been made
   * @return Currently available resources
   */
  public abstract Resource getAvailableResources();

  /**
   * Get the current number of nodes in the cluster.
   * A valid values is available after a call to allocate has been made
   * @return Current number of nodes in the cluster
   */
  public abstract int getClusterNodeCount();

  public interface CallbackHandler { 注視:這里有一系列的回調函數 /**
     * Called when the ResourceManager responds to a heartbeat with completed
     * containers. If the response contains both completed containers and
     * allocated containers, this will be called before containersAllocated.
     */
    public void onContainersCompleted(List<ContainerStatus> statuses);
    
    /**
     * Called when the ResourceManager responds to a heartbeat with allocated
     * containers. If the response containers both completed containers and
     * allocated containers, this will be called after containersCompleted.
     */
    public void onContainersAllocated(List<Container> containers);
    
    /**
     * Called when the ResourceManager wants the ApplicationMaster to shutdown
     * for being out of sync etc. The ApplicationMaster should not unregister
     * with the RM unless the ApplicationMaster wants to be the last attempt.
     */
    public void onShutdownRequest();
    
    /**
     * Called when nodes tracked by the ResourceManager have changed in health,
     * availability etc.
     */
    public void onNodesUpdated(List<NodeReport> updatedNodes);
    
    public float getProgress();
    
    /**
     * Called when error comes from RM communications as well as from errors in
     * the callback itself from the app. Calling
     * stop() is the recommended action.
     *
     * @param e
     */
    public void onError(Throwable e);
  }
}

用戶實現一個MyCallbackHandler,實現AMRMClientAsync.CallbackHandler接口:

class MyCallbackHandler implements AMRMClientAsync.CallbackHandler{

.....................

}

ApplicationMaster編程---AM 與 RM交互
引入jar包, org.apche.hadoop.yarn.client.api.async.AMRMClientAsync;
流程:
1 構造一個MyCallbackHandler對象
AMRMClientAsync.CallbackHandler allocListener = new MyCallbackHandler();
2 構造一個AMRMClientAsync句柄
asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, allcoListener);
3 初始化並啟動AMRMClientAsync
asyncClient.init(conf);//通過傳入一個YarnConfiguration對象並進行初始化
asyncClient.start(); //啟動asyncClient


4 ApplicationMaster向ResourceManager注冊
RegisterApplicationMasterResponse reponse = asyncClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
5 添加Container請求
asyncClient.addContainerRequest(containerRequest)
6 等待應用程序運行結束
asyncClient.unregisterApplicationMaster(status, appMsg, null); [反注冊]
asyncCLient.stop()



/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.client.api.async;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.annotations.VisibleForTesting;

/**
 * <code>NMClientAsync</code> handles communication with all the NodeManagers
 * and provides asynchronous updates on getting responses from them. It
 * maintains a thread pool to communicate with individual NMs where a number of
 * worker threads process requests to NMs by using {@link NMClientImpl}. The max
 * size of the thread pool is configurable through
 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
 *
 * It should be used in conjunction with a CallbackHandler. For example
 *
 * <pre>
 * {@code
 * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
 *   public void onContainerStarted(ContainerId containerId,
 *       Map<String, ByteBuffer> allServiceResponse) {
 *     [post process after the container is started, process the response]
 *   }
 *
 *   public void onContainerStatusReceived(ContainerId containerId,
 *       ContainerStatus containerStatus) {
 *     [make use of the status of the container]
 *   }
 *
 *   public void onContainerStopped(ContainerId containerId) {
 *     [post process after the container is stopped]
 *   }
 *
 *   public void onStartContainerError(
 *       ContainerId containerId, Throwable t) {
 *     [handle the raised exception]
 *   }
 *
 *   public void onGetContainerStatusError(
 *       ContainerId containerId, Throwable t) {
 *     [handle the raised exception]
 *   }
 *
 *   public void onStopContainerError(
 *       ContainerId containerId, Throwable t) {
 *     [handle the raised exception]
 *   }
 * }
 * }
 * </pre>
 *
 * The client's life-cycle should be managed like the following:
 *
 * <pre>
 * {@code
 * NMClientAsync asyncClient = 
 *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
 * asyncClient.init(conf);
 * asyncClient.start();
 * asyncClient.startContainer(container, containerLaunchContext);
 * [... wait for container being started]
 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
 *     container.getContainerToken());
 * [... handle the status in the callback instance]
 * asyncClient.stopContainer(container.getId(), container.getNodeId(),
 *     container.getContainerToken());
 * [... wait for container being stopped]
 * asyncClient.stop();
 * }
 * </pre>
 */
@Public
@Stable
public abstract class NMClientAsync extends AbstractService {

  protected NMClient client;
  protected CallbackHandler callbackHandler;

  public static NMClientAsync createNMClientAsync(
      CallbackHandler callbackHandler) {
    return new NMClientAsyncImpl(callbackHandler);
  }
  
  protected NMClientAsync(CallbackHandler callbackHandler) {
    this (NMClientAsync.class.getName(), callbackHandler);
  }

  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
    this (name, new NMClientImpl(), callbackHandler);
  }

  @Private
  @VisibleForTesting
  protected NMClientAsync(String name, NMClient client,
      CallbackHandler callbackHandler) {
    super(name);
    this.setClient(client);
    this.setCallbackHandler(callbackHandler);
  }

  public abstract void startContainerAsync(
      Container container, ContainerLaunchContext containerLaunchContext);

  public abstract void stopContainerAsync(
      ContainerId containerId, NodeId nodeId);

  public abstract void getContainerStatusAsync(
      ContainerId containerId, NodeId nodeId);
  
  public NMClient getClient() {
    return client;
  }

  public void setClient(NMClient client) {
    this.client = client;
  }

  public CallbackHandler getCallbackHandler() {
    return callbackHandler;
  }

  public void setCallbackHandler(CallbackHandler callbackHandler) {
    this.callbackHandler = callbackHandler;
  }

  /**
   * <p>
   * The callback interface needs to be implemented by {@link NMClientAsync}
   * users. The APIs are called when responses from <code>NodeManager</code> are
   * available.
   * </p>
   *
   * <p>
   * Once a callback happens, the users can chose to act on it in blocking or
   * non-blocking manner. If the action on callback is done in a blocking
   * manner, some of the threads performing requests on NodeManagers may get
   * blocked depending on how many threads in the pool are busy.
   * </p>
   *
   * <p>
   * The implementation of the callback function should not throw the
   * unexpected exception. Otherwise, {@link NMClientAsync} will just
   * catch, log and then ignore it.
   * </p>
   */
  public static interface CallbackHandler {
    /**
     * The API is called when <code>NodeManager</code> responds to indicate its
     * acceptance of the starting container request
     * @param containerId the Id of the container
     * @param allServiceResponse a Map between the auxiliary service names and
     *                           their outputs
     */
    void onContainerStarted(ContainerId containerId,
        Map<String, ByteBuffer> allServiceResponse);

    /**
     * The API is called when <code>NodeManager</code> responds with the status
     * of the container
     * @param containerId the Id of the container
     * @param containerStatus the status of the container
     */
    void onContainerStatusReceived(ContainerId containerId,
        ContainerStatus containerStatus);

    /**
     * The API is called when <code>NodeManager</code> responds to indicate the
     * container is stopped.
     * @param containerId the Id of the container
     */
    void onContainerStopped(ContainerId containerId);

    /**
     * The API is called when an exception is raised in the process of
     * starting a container
     *
     * @param containerId the Id of the container
     * @param t the raised exception
     */
    void onStartContainerError(ContainerId containerId, Throwable t);

    /**
     * The API is called when an exception is raised in the process of
     * querying the status of a container
     *
     * @param containerId the Id of the container
     * @param t the raised exception
     */
    void onGetContainerStatusError(ContainerId containerId, Throwable t);

    /**
     * The API is called when an exception is raised in the process of
     * stopping a container
     *
     * @param containerId the Id of the container
     * @param t the raised exception
     */
    void onStopContainerError(ContainerId containerId, Throwable t);

  }

}


ApplicationMaster編程酷,AM與NM交互酷
用戶實現一個MyCallbackHandler,實現NMClientAsync.CallbackHandler接口

class MyCallbackHandler implements NMClientAsync.CallbackHandler{
............
}


引入jar包:
org.apache.hadoop.yarn.client.api.async.NMClientAsync;
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl

流程:
1 構造一個NMClientAsync句柄
NMClientAsync asyncClient = new NMClientAsyncImpl(new MyCallbackhandler())
2 初始化並啟動 NMClientAsync
asyncClient.init(conf);//初始化ansyClient
asyncClient.start(); //啟動asyncClient
3 構造Container
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
...//設置ctx變量
4 啟動Container
asyncClient.startContainerAsync(container, ctx);
5 獲取container狀態
asyncClient.getContainerStatusAsync(container.getId(), container.getNodeId(), container.getContainerToken());
6 停止Container
asyncClient.stopContainerAsync(container.getId(), container.getNodeId(), container.getContainerToken());
asyncClient.stop()





 
          

 














 



 











 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM