客戶端編程庫: 所在jar包: org.apache.hadoop.yarn.client.YarnClient 使用方法: 1 定義一個YarnClient實例: private YarnClient client; 2 構造一個Yarn客戶端句柄並初始化
this.client = YarnClient.createYarnClient();
client.ini(conf)
3 啟動Yarn
yarnClient.start()
4 獲取一個新的application id
YarnClientApplication app=yarnClient.createApplication(); 注解:application id 封裝在YarnCLientApplication里面了。
5 構造ApplicationSubmissionContext, 用以提交作業
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
###################注解:一下會有很多set 屬性的東東“程序名稱,優先級,所在的隊列啊,###################################
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName(appName)
......
yarnClient.submitApplication(appContext);//將應用程序提交到ResouceManager上 這里通過步驟 2 中的conf 讀取yarn-site.xml



ApplicationMaster編程酷
設計思路:
為用戶暴露‘回調函數’用戶只需要實現這些回調函數,當某種事情發生時,會調用對應的(用戶實現的)回調函數
回調機制:
1 定義一個回調函數
2 提供函數實現的一方在初始化的時候,將回調函數的函數指針注冊給調用者
3 當特殊條件或事件發生的時候,調用者使用函數指針調用回調函數對事件進行處理
回調機制好處:
可以把調用者和被調用者分開,調用者不關心誰是調用者。它只需知道存在一個具有特定原型和限制條件的被調用函數。

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.yarn.client.api.async; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; /** * <code>AMRMClientAsync</code> handles communication with the ResourceManager * and provides asynchronous updates on events such as container allocations and * completions. It contains a thread that sends periodic heartbeats to the * ResourceManager. * * It should be used by implementing a CallbackHandler: * <pre> * {@code * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler { * public void onContainersAllocated(List<Container> containers) { * [run tasks on the containers] * } * * public void onContainersCompleted(List<ContainerStatus> statuses) { * [update progress, check whether app is done] * } * * public void onNodesUpdated(List<NodeReport> updated) {} * * public void onReboot() {} * } * } * </pre> * * The client's lifecycle should be managed similarly to the following: * * <pre> * {@code * AMRMClientAsync asyncClient = * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); * asyncClient.init(conf); * asyncClient.start(); * RegisterApplicationMasterResponse response = asyncClient * .registerApplicationMaster(appMasterHostname, appMasterRpcPort, * appMasterTrackingUrl); * asyncClient.addContainerRequest(containerRequest); * [... wait for application to complete] * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); * asyncClient.stop(); * } * </pre> */ @Public @Stable public abstract class AMRMClientAsync<T extends ContainerRequest> extends AbstractService { protected final AMRMClient<T> client; protected final CallbackHandler handler; protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); public static <T extends ContainerRequest> AMRMClientAsync<T> createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); } public static <T extends ContainerRequest> AMRMClientAsync<T> createAMRMClientAsync(AMRMClient<T> client, int intervalMs, CallbackHandler callbackHandler) { return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); } protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); } @Private @VisibleForTesting protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, CallbackHandler callbackHandler) { super(AMRMClientAsync.class.getName()); this.client = client; this.heartbeatIntervalMs.set(intervalMs); this.handler = callbackHandler; } public void setHeartbeatInterval(int interval) { heartbeatIntervalMs.set(interval); } public abstract List<? extends Collection<T>> getMatchingRequests( Priority priority, String resourceName, Resource capability); /** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. * @throws YarnException * @throws IOException */ public abstract RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException; /** * Unregister the application master. This must be called in the end. * @param appStatus Success/Failure status of the master * @param appMessage Diagnostics message on failure * @param appTrackingUrl New URL to get master info * @throws YarnException * @throws IOException */ public abstract void unregisterApplicationMaster( FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException; /** * Request containers for resources before calling <code>allocate</code> * @param req Resource request */ public abstract void addContainerRequest(T req); /** * Remove previous container request. The previous container request may have * already been sent to the ResourceManager. So even after the remove request * the app must be prepared to receive an allocation for the previous request * even after the remove request * @param req Resource request */ public abstract void removeContainerRequest(T req); /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. * The app needs to make new requests for the released resource capability if * it still needs it. eg. it released non-local resources * @param containerId */ public abstract void releaseAssignedContainer(ContainerId containerId); /** * Get the currently available resources in the cluster. * A valid value is available after a call to allocate has been made * @return Currently available resources */ public abstract Resource getAvailableResources(); /** * Get the current number of nodes in the cluster. * A valid values is available after a call to allocate has been made * @return Current number of nodes in the cluster */ public abstract int getClusterNodeCount(); public interface CallbackHandler { 注視:這里有一系列的回調函數 /** * Called when the ResourceManager responds to a heartbeat with completed * containers. If the response contains both completed containers and * allocated containers, this will be called before containersAllocated. */ public void onContainersCompleted(List<ContainerStatus> statuses); /** * Called when the ResourceManager responds to a heartbeat with allocated * containers. If the response containers both completed containers and * allocated containers, this will be called after containersCompleted. */ public void onContainersAllocated(List<Container> containers); /** * Called when the ResourceManager wants the ApplicationMaster to shutdown * for being out of sync etc. The ApplicationMaster should not unregister * with the RM unless the ApplicationMaster wants to be the last attempt. */ public void onShutdownRequest(); /** * Called when nodes tracked by the ResourceManager have changed in health, * availability etc. */ public void onNodesUpdated(List<NodeReport> updatedNodes); public float getProgress(); /** * Called when error comes from RM communications as well as from errors in * the callback itself from the app. Calling * stop() is the recommended action. * * @param e */ public void onError(Throwable e); } }
用戶實現一個MyCallbackHandler,實現AMRMClientAsync.CallbackHandler接口:
class MyCallbackHandler implements AMRMClientAsync.CallbackHandler{
.....................
}
ApplicationMaster編程---AM 與 RM交互
引入jar包, org.apche.hadoop.yarn.client.api.async.AMRMClientAsync;
流程:
1 構造一個MyCallbackHandler對象
AMRMClientAsync.CallbackHandler allocListener = new MyCallbackHandler();
2 構造一個AMRMClientAsync句柄
asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, allcoListener);
3 初始化並啟動AMRMClientAsync
asyncClient.init(conf);//通過傳入一個YarnConfiguration對象並進行初始化
asyncClient.start(); //啟動asyncClient
4 ApplicationMaster向ResourceManager注冊
RegisterApplicationMasterResponse reponse = asyncClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
5 添加Container請求
asyncClient.addContainerRequest(containerRequest)
6 等待應用程序運行結束
asyncClient.unregisterApplicationMaster(status, appMsg, null); [反注冊]
asyncCLient.stop()

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.yarn.client.api.async; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.annotations.VisibleForTesting; /** * <code>NMClientAsync</code> handles communication with all the NodeManagers * and provides asynchronous updates on getting responses from them. It * maintains a thread pool to communicate with individual NMs where a number of * worker threads process requests to NMs by using {@link NMClientImpl}. The max * size of the thread pool is configurable through * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}. * * It should be used in conjunction with a CallbackHandler. For example * * <pre> * {@code * class MyCallbackHandler implements NMClientAsync.CallbackHandler { * public void onContainerStarted(ContainerId containerId, * Map<String, ByteBuffer> allServiceResponse) { * [post process after the container is started, process the response] * } * * public void onContainerStatusReceived(ContainerId containerId, * ContainerStatus containerStatus) { * [make use of the status of the container] * } * * public void onContainerStopped(ContainerId containerId) { * [post process after the container is stopped] * } * * public void onStartContainerError( * ContainerId containerId, Throwable t) { * [handle the raised exception] * } * * public void onGetContainerStatusError( * ContainerId containerId, Throwable t) { * [handle the raised exception] * } * * public void onStopContainerError( * ContainerId containerId, Throwable t) { * [handle the raised exception] * } * } * } * </pre> * * The client's life-cycle should be managed like the following: * * <pre> * {@code * NMClientAsync asyncClient = * NMClientAsync.createNMClientAsync(new MyCallbackhandler()); * asyncClient.init(conf); * asyncClient.start(); * asyncClient.startContainer(container, containerLaunchContext); * [... wait for container being started] * asyncClient.getContainerStatus(container.getId(), container.getNodeId(), * container.getContainerToken()); * [... handle the status in the callback instance] * asyncClient.stopContainer(container.getId(), container.getNodeId(), * container.getContainerToken()); * [... wait for container being stopped] * asyncClient.stop(); * } * </pre> */ @Public @Stable public abstract class NMClientAsync extends AbstractService { protected NMClient client; protected CallbackHandler callbackHandler; public static NMClientAsync createNMClientAsync( CallbackHandler callbackHandler) { return new NMClientAsyncImpl(callbackHandler); } protected NMClientAsync(CallbackHandler callbackHandler) { this (NMClientAsync.class.getName(), callbackHandler); } protected NMClientAsync(String name, CallbackHandler callbackHandler) { this (name, new NMClientImpl(), callbackHandler); } @Private @VisibleForTesting protected NMClientAsync(String name, NMClient client, CallbackHandler callbackHandler) { super(name); this.setClient(client); this.setCallbackHandler(callbackHandler); } public abstract void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext); public abstract void stopContainerAsync( ContainerId containerId, NodeId nodeId); public abstract void getContainerStatusAsync( ContainerId containerId, NodeId nodeId); public NMClient getClient() { return client; } public void setClient(NMClient client) { this.client = client; } public CallbackHandler getCallbackHandler() { return callbackHandler; } public void setCallbackHandler(CallbackHandler callbackHandler) { this.callbackHandler = callbackHandler; } /** * <p> * The callback interface needs to be implemented by {@link NMClientAsync} * users. The APIs are called when responses from <code>NodeManager</code> are * available. * </p> * * <p> * Once a callback happens, the users can chose to act on it in blocking or * non-blocking manner. If the action on callback is done in a blocking * manner, some of the threads performing requests on NodeManagers may get * blocked depending on how many threads in the pool are busy. * </p> * * <p> * The implementation of the callback function should not throw the * unexpected exception. Otherwise, {@link NMClientAsync} will just * catch, log and then ignore it. * </p> */ public static interface CallbackHandler { /** * The API is called when <code>NodeManager</code> responds to indicate its * acceptance of the starting container request * @param containerId the Id of the container * @param allServiceResponse a Map between the auxiliary service names and * their outputs */ void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse); /** * The API is called when <code>NodeManager</code> responds with the status * of the container * @param containerId the Id of the container * @param containerStatus the status of the container */ void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus); /** * The API is called when <code>NodeManager</code> responds to indicate the * container is stopped. * @param containerId the Id of the container */ void onContainerStopped(ContainerId containerId); /** * The API is called when an exception is raised in the process of * starting a container * * @param containerId the Id of the container * @param t the raised exception */ void onStartContainerError(ContainerId containerId, Throwable t); /** * The API is called when an exception is raised in the process of * querying the status of a container * * @param containerId the Id of the container * @param t the raised exception */ void onGetContainerStatusError(ContainerId containerId, Throwable t); /** * The API is called when an exception is raised in the process of * stopping a container * * @param containerId the Id of the container * @param t the raised exception */ void onStopContainerError(ContainerId containerId, Throwable t); } }
ApplicationMaster編程酷,AM與NM交互酷
用戶實現一個MyCallbackHandler,實現NMClientAsync.CallbackHandler接口
class MyCallbackHandler implements NMClientAsync.CallbackHandler{
............
}
引入jar包:
org.apache.hadoop.yarn.client.api.async.NMClientAsync;
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
流程:
1 構造一個NMClientAsync句柄
NMClientAsync asyncClient = new NMClientAsyncImpl(new MyCallbackhandler())
2 初始化並啟動 NMClientAsync
asyncClient.init(conf);//初始化ansyClient
asyncClient.start(); //啟動asyncClient
3 構造Container
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
...//設置ctx變量
4 啟動Container
asyncClient.startContainerAsync(container, ctx);
5 獲取container狀態
asyncClient.getContainerStatusAsync(container.getId(), container.getNodeId(), container.getContainerToken());
6 停止Container
asyncClient.stopContainerAsync(container.getId(), container.getNodeId(), container.getContainerToken());
asyncClient.stop()