上一節以WordCount分析了MapReduce的基本執行流程,但並沒有從框架上進行分析,這一部分工作在后續慢慢補充。這一節,先剖析一下作業提交過程。
在分析之前,我們先進行一下粗略的思考,如果要我們自己設計分布式計算,應該怎么設計呢?假定有100個任務要並發執行,每個任務分別針對一塊數據,這些數據本身是分布在多個機器上的,主要面臨哪些問題?
1、數據如何分布是首先面臨的問題,可能也是影響分布式計算性能的最關鍵問題。一個超大文件,按照哪種方式切割開來,分別丟到不同的機器?Hadoop的答案是按照64MB或者128MB等長切割,如果切的太小,要記錄的元數據信息太多,如果切得太大,負載均衡可能比較差。如果按照某種方式切割開來后,怎么丟到多個機器?我覺得一種最簡單的方案就是哪台機器剩余空間越多,就往哪台機器上丟。這樣在存儲空間上可以保持差不多。而且,不太會存在熱點問題。這個以后分析HDFS的時候再看;
2、假如已經分布好了,那么任務的計算就在數據所在機器執行。哪個機器擁有什么數據,哪個機器就負責對自己擁有那塊數據進行處理。任務的划分就取決於數據的划分,比如一個機器擁有了5個64MB,那就給他分配5個任務,各自處理一塊,這種符合直覺的思維也是Hadoop采用的。當然,實際上沒這么簡單,比如有的機器可能數據分布就是多了些,那么豈不是這台機器會成為短板,整個作業的執行時間依賴於最慢這台機器執行的時間。在Hadoop里,為了解決這個問題,會把一個任務可能丟到兩個機器上執行,誰先處理完,另一個就停了它。不管怎樣,只要涉及到任務分配,是不是至少有一台機器負責這個分配過程吧?所以MapReduce有一個主控節點。不過理論上看也未必,假如大家按照某種同樣的算法就能知道什么樣的任務該到哪台機器上算,整個分布式計算集群也可以是P2P的。
3、關鍵的問題來了,如果數據之間存在關聯怎么辦?比如一個任務的執行需要依賴於另一個任務執行完畢才能開始,這種同步等待是並行計算的大忌,有可能大部分時間會花在等待上,甚至還不如串行執行?Hadoop里面假設的就是任務可以完全並行,如果需要關聯,那么,就在這些並行任務之后,再啟動一個任務去關聯,后面的這種任務就是Reduce,不過天底下不一定都能這么理想地分,所以我覺得Hadoop還是有一定先天缺陷的,模型還是簡單了點。
下面開始分析。
MapReduce集群包含一個JobTracker和多個TaskTracker,這里先不考慮YARN,仍然依據1版本進行分析。
一個MapReduce作業在Hadoop中稱為Job,而JobTracker顧名思義就是對Job進行管理的節點,一個Job包含多個Map和Reduce任務,在Hadoop里Map和Reduce任務稱為Task,而Job指的是Map-Reduce流程的稱呼。一個Job包含多個Map Task和Reduce Task,在看作業提交代碼之前,需要有一些基本的認識:
1、Job所需要的輸入數據、資源(數據分布信息、參數配置等等)都存放於HDFS之上,其中資源信息需要Job客戶端先提交至HDFS之上,這些資源信息並不傳輸至JobTracker,因為JobTracker本身也能隨便訪問HDFS,所以JobTracker是去HDFS中獲得相應信息后再進行Map和Reduce Task分配;
2、JobClient和JobTracker可以看作CS結構,JobClient往HDFS中存入資源后,會朝JobTracker提交作業,至於到底傳輸給JobTracker些什么內容,實際上只是一個Job ID以及Job所在的HDFS文件目錄等基本信息,需要注意的是,他們之間並不直接傳遞任何計算數據和資源數據,因為他們都是HDFS的客戶端,都可以訪問HDFS系統。
3、JobTracker的主要任務就是分配作業,所謂分配作業,說白了,就是將一個Job分為多個Map和Reduce任務,然后指定這些任務到底由哪些機器執行,執行任務的機器即為TaskTracker,作業到底分為多少個任務,這在傳統的MPI編程中是由程序員指定的,分好任務后,任務到底在哪些機器上執行,這也是需要程序員指定的;MapReduce的不同在於,這個作業切分的過程,以及任務在哪些機器上執行的問題,是由Hadoop自己搞定的,程序員需要做的就是先將要計算的數據放到HDFS上,把Map和Reduce任務執行的(1份!)代碼編寫好,然后啟動即可,數據到底放在了哪些機器,程序員可以不關心(查看HDFS管理信息才知道),編寫的代碼到底在哪些機器上(被自動拷貝多份!)執行,程序員也不關心(當然,非要去查看也是可以看到的)。
4、JobTracker分配好任務后,並不是直接通知TaskTracker,而是等着TaskTracker自己來取,這種設計可能是考慮MapReduce作業一般執行時間較長,比如幾十分鍾以上;而且JobTracker的壓力不宜過大,趁着心跳時一起把任務信息獲取了,否則單點容易形成瓶頸。JobTracker和TaskTracker之間存在心跳機制,可以配置,比如5秒(心跳頻繁又會帶來集群單點瓶頸難以擴展的問題,因為大家都跟JobTracker心跳,壓力山大啊),因此,在JobClient向HDFS提交資源信息,並向JobTracker提交作業后Job進入作業隊列,JobTracker從隊列中取出Job並分配好Map/Reduce任務后的幾秒后,TaskTracker可能才知道自己應該執行任務。這一作業啟動過程時間一般都要幾秒,延時較大,無法支持實時處理,這一點經常被Spark拿來鄙視,但Spark集群規模擴展后難道不存在單點瓶頸?但凡是單點分配任務的集群,不可避免都會遇到這個問題。除非分配任務的節點也可以擴展。
5、Map Task和Reduce Task最終運行於TaskTracker之上,TaskTracker一般是一台安裝了JAVA虛擬機的Linux服務器,啟動Map Task和Reduce Task時會啟動JAVA虛擬機,執行Map或Reduce任務(因此Map、Reduce都是一個個JAVA進程),JAVA虛擬機啟動的速度本身還是比較快,運行完畢后通知JobTracker,關閉JAVA虛擬機。一個TaskTracker可以啟動很多個JAVA進程執行很多Map和Reduce任務,在YARN(Hadoop 2.0的分布式資源管理系統)中,可以指定一個Map、Reduce任務需要多少CPU核和內存,目前PC服務器一般有幾十個核,和64GB以上內存,所以執行幾十個Map/Reduce任務也是正常的;在YARN之前,可以配置一台服務器可以執行多少個Map/Reduce任務,但並不考慮各個Map/Reduce任務消耗資源的區別。
5、JobClient利用RPC機制請求JobTracker的服務,比如分配Job ID、啟動作業、停止作業、查看Job進展等等。RPC機制是Hadoop里面一個很核心的部分,理解RPC機制是理解Hadoop的前提。JobTracker是MapReduce中最重要的一個類,實現了很多接口:
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean { 。。。。。。
其中,JobSubmissionProtocol就是JobClient和JobTracker之間RPC的服務接口。這個接口的實現類就是JobTracker,包含的功能主要有:
interface JobSubmissionProtocol extends VersionedProtocol { public JobID getNewJobId() throws IOException; public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException; public ClusterStatus getClusterStatus(boolean detailed) throws IOException; public void killJob(JobID jobid) throws IOException; public void setJobPriority(JobID jobid, String priority) throws IOException; public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; public JobProfile getJobProfile(JobID jobid) throws IOException; public JobStatus getJobStatus(JobID jobid) throws IOException; public Counters getJobCounters(JobID jobid) throws IOException; public TaskReport[] getMapTaskReports(JobID jobid) throws IOException; public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException; public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException; public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException; ........ }
在JobClient這一端,使用動態代理機制(至於什么是動態代理,參考JAVA Proxy、InvocationHandler相關類),在調用JobSubmissionProtocol的下面方法(這個方法在Job客戶端並沒有具體實現)時:
public JobID getNewJobId() throws IOException;
進入代理類相關方法(invoke),以RPC機制往JobTracker發送相應請求(方法+參數),JobTracker接收到請求后,處理后返回結果。
下面進入代碼分析。上一節結尾時分析到Job的作業提交方法:
/** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job connect(); info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
先來看connect()方法,掌握RPC機制,再分析submitJobInternal的作業提交過程:
0、JobClient與JobTracker之間的RPC機制
private void connect() throws IOException, InterruptedException { ugi.doAs(new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { jobClient = new JobClient((JobConf) getConfiguration()); return null; } }); }
在該方法中創建了一個JobClient對象,在其構造函數中調用init方法創建一個代理類:
public JobClient(JobConf conf) throws IOException { setConf(conf); init(conf); } public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); tasklogtimeout = conf.getInt( TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); this.ugi = UserGroupInformation.getCurrentUser(); if ("local".equals(tracker)) { conf.setNumMapTasks(1); this.jobSubmitClient = new LocalJobRunner(conf); } else { this.rpcJobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); } }
rpcJobSubmitClient 是一個JobSubmissionProtocol 對象,而JobSubmissionProtocol 是一個與JobTracker服務相關的RPC接口,提供了一些服務訪問方法。
private JobSubmissionProtocol rpcJobSubmitClient;
在createRPCProxy方法中,調用了Hadoop RPC的創建代理的方法RPC.getProxy:
private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { JobSubmissionProtocol rpcJobSubmitClient = (JobSubmissionProtocol)RPC.getProxy( JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), 0, RetryUtils.getMultipleLinearRandomRetry( conf, MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT ), false); return rpcJobSubmitClient; }
這個方法里面核心的是創建一個Invoker對象:
/** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ public static VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, boolean checkVersion) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker); if (checkVersion) { checkVersion(protocol, clientVersion, proxy); } return proxy; }
而Invoker是一個實現了java.lang.reflect.InvocationHandler的類,負責代理JobSubmissionProtocol的各種服務,當JobClient調用JobSubmissionProtocol的方法(比如JobID getNewJobId() )時,會進入Invoker的invoke方法,而在該方法中,會將調用的方法信息打包送至JobTracker執行:
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; ....... public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ...... ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); .....return value.get(); }
client是RPC客戶端,其call方法會創建一個Invocation對象,該對象封裝了要調用的方法信息:
private static class Invocation implements Writable, Configurable { private String methodName; private Class[] parameterClasses; private Object[] parameters; private Configuration conf; .......
然后在call方法中,將這一信息序列化(Invocation 是一個可序列化對象)后送出去:
/** Make a call, passing <code>param</code>, to the IPC server defined by * <code>remoteId</code>, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. */ public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } 。。。。。。 } }
sendParam(call)這一方法就是發送出去的代碼:
/** Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. */ public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //for serializing the //data to be written d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length out.write(data, 0, dataLength);//write the data out.flush(); } } catch(IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } }
理解了上面的RPC機制,再來看作業提交函數submitJobInternal的執行。
submitJobInternal函數是JobClient向JobTracker提交作業的核心方法:
/** * Internal method for submitting jobs to the system. * @param job the configuration to submit * @return a proxy object for the running job * @throws FileNotFoundException * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ public RunningJob submitJobInternal(final JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException{ JobConf jobCopy = job; Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); copyAndConfigureFiles(jobCopy, submitJobDir); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(), new Path [] {submitJobDir}, jobCopy); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); int reduces = jobCopy.getNumReduceTasks(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContext(jobCopy, jobId); // Check the output specification if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } jobCopy = (JobConf)context.getConfiguration(); // Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString()); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(jobCopy); try { jobCopy.writeXml(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // printTokens(jobId, jobCopy.getCredentials()); status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials()); JobProfile prof = jobSubmitClient.getJobProfile(jobId); if (status != null && prof != null) { return new NetworkedJob(status, prof, jobSubmitClient); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } } } }); }
該方法比較復雜,主要分為幾個步驟,我們對其分解后逐一分析:
1、獲取Job所在HDFS中的根目錄
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,jobCopy);
在MapReduce中,所有Job的信息(不是輸入數據)都會存放於某個根目錄下,這個根目錄稱為staging目錄,Staging在英文中含義是臨時工作台、腳手架等,個人理解他的意思是要執行MapReduce作業的這些框架信息(比如數據分布信息,Job配置參數等等)存放在這里。由參數mapreduce.jobtracker.staging.root.dir配置,默認是“/tmp/hadoop/mapred/staging”
這可以由getStagingDir方法的內部看到,JobClient里的這個方法最終會調用JobSubmissionFiles的getStagingDir方法:
public static Path getStagingDir(JobClient client, Configuration conf) throws IOException, InterruptedException { Path stagingArea = client.getStagingAreaDir(); FileSystem fs = stagingArea.getFileSystem(conf); 。。。。 return stagingArea; }
可見,調用了client的getStagingAreaDir方法:
public Path getStagingAreaDir() throws IOException { if (stagingAreaDir == null) { stagingAreaDir = new Path(jobSubmitClient.getStagingAreaDir()); } return stagingAreaDir; }
最終,調用jobSubmitClient的getStagingAreaDir方法,jobSubmitClient是一個JobSubmissionProtocol接口對象,通過動態代理,調用了位於服務端JobTracker的同名方法:
public String getStagingAreaDir() throws IOException { // Check for safe-mode checkSafeMode(); try{ final String user = UserGroupInformation.getCurrentUser().getShortUserName(); return getMROwner().doAs(new PrivilegedExceptionAction<String>() { @Override public String run() throws Exception { return getStagingAreaDirInternal(user); } }); } catch(InterruptedException ie) { throw new IOException(ie); } }
其中調用了getStagingAreaDirInternal方法:
private String getStagingAreaDirInternal(String user) throws IOException { final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging")); final FileSystem fs = stagingRootDir.getFileSystem(conf); return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); }
得到該目錄后,通過RPC返回。user是用戶名,比如esingchan,則目錄為:
/tmp/hadoop/mapred/staging/esingchan/.staging/
2、獲取Job ID和Job工作目錄(但還沒創建):
JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
getNewJobId這個方法也是通過RPC獲得ID信息(job id從1開始遞增),然后在HDFS中的staging目錄下創建工作目錄,將這個目錄設置成mapreduce.job.dir的值,最終目錄一般呈現類似這種形式:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/
Job ID的生成一般由時間等組成,具體在JobTracker方法getNewJobId中:
public synchronized JobID getNewJobId() throws IOException { // Check for JobTracker operational state checkJobTrackerState(); return new JobID(getTrackerIdentifier(), nextJobId++); }
3、創建工作目錄,向HDFS提交資源數據:
copyAndConfigureFiles(jobCopy, submitJobDir);
copyAndConfigureFiles這個方法是JobClient向HDFS提交資源數據的主要方法。其實現為:
private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir) throws IOException, InterruptedException { short replication = (short)job.getInt("mapred.submit.replication", 10); copyAndConfigureFiles(job, jobSubmitDir, replication); // Set the working directory if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(fs.getWorkingDirectory()); } } private void copyAndConfigureFiles(JobConf job, Path submitJobDir, short replication) throws IOException, InterruptedException { if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) { LOG.warn("Use GenericOptionsParser for parsing the arguments. " + "Applications should implement Tool for the same."); } // Retrieve command line arguments placed into the JobConf // by GenericOptionsParser. String files = job.get("tmpfiles"); String libjars = job.get("tmpjars"); String archives = job.get("tmparchives"); // // Figure out what fs the JobTracker is using. Copy the // job to it, under a temporary name. This allows DFS to work, // and under the local fs also provides UNIX-like object loading // semantics. (that is, if the job file is deleted right after // submission, we can still run the submission to completion) // // Create a number of filenames in the JobTracker's fs namespace FileSystem fs = submitJobDir.getFileSystem(job); LOG.debug("default FileSystem: " + fs.getUri()); if (fs.exists(submitJobDir)) { throw new IOException("Not submitting job. Job directory " + submitJobDir +" already exists!! This is unexpected.Please check what's there in" + " that directory"); } submitJobDir = fs.makeQualified(submitJobDir); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); // add all the command line files/ jars and archive // first copy them to jobtrackers filesystem if (files != null) { FileSystem.mkdirs(fs, filesDir, mapredSysPerms); String[] fileArr = files.split(","); for (String tmpFile: fileArr) { URI tmpURI; try { tmpURI = new URI(tmpFile); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheFile(pathURI, job); } catch(URISyntaxException ue) { //should not throw a uri exception throw new IOException("Failed to create uri for " + tmpFile, ue); } DistributedCache.createSymlink(job); } } if (libjars != null) { FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms); String[] libjarsArr = libjars.split(","); for (String tmpjars: libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication); DistributedCache.addArchiveToClassPath (new Path(newPath.toUri().getPath()), job, fs); } } if (archives != null) { FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); String[] archivesArr = archives.split(","); for (String tmpArchives: archivesArr) { URI tmpURI; try { tmpURI = new URI(tmpArchives); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheArchive(pathURI, job); } catch(URISyntaxException ue) { //should not throw an uri excpetion throw new IOException("Failed to create uri for " + tmpArchives, ue); } DistributedCache.createSymlink(job); } } // First we check whether the cached archives and files are legal. TrackerDistributedCacheManager.validate(job); // set the timestamps of the archives and files and set the // public/private visibility of the archives and files TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job); // get DelegationTokens for cache files TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials()); String originalJarPath = job.getJar(); if (originalJarPath != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())){ job.setJobName(new Path(originalJarPath).getName()); } Path originalJarFile = new Path(originalJarPath); URI jobJarURI = originalJarFile.toUri(); // If the job jar is already in fs, we don't need to copy it from local fs if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null || !(jobJarURI.getScheme().equals(fs.getUri().getScheme()) && jobJarURI.getAuthority().equals( fs.getUri().getAuthority()))) { Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(originalJarFile, submitJarFile); fs.setReplication(submitJarFile, replication); fs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); } } else { LOG.warn("No job jar file set. User classes may not be found. "+ "See JobConf(Class) or JobConf#setJar(String)."); } }
其中:
short replication = (short)job.getInt("mapred.submit.replication", 10);
表示這部分資源數據可靠性要求很高,在HDFS中默認保留10份。
copyAndConfigureFiles中以下幾行代碼:
String files = job.get("tmpfiles"); String libjars = job.get("tmpjars"); String archives = job.get("tmparchives"); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); FileSystem.mkdirs(fs, filesDir, mapredSysPerms); Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication); FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication); FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms); Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
分別在submitJorDir的目錄下創建臨時文件、Jar包文件、歸檔文件的目錄,根據JobSubmissionFiles可以看出其目錄名:
public static Path getJobDistCacheFiles(Path jobSubmitDir) { return new Path(jobSubmitDir, "files"); } /** * Get the job distributed cache archives path. * @param jobSubmitDir */ public static Path getJobDistCacheArchives(Path jobSubmitDir) { return new Path(jobSubmitDir, "archives"); } /** * Get the job distributed cache libjars path. * @param jobSubmitDir */ public static Path getJobDistCacheLibjars(Path jobSubmitDir) { return new Path(jobSubmitDir, "libjars"); }
可見,創建后的目錄示例:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/files
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/archives
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/libjars
之后,將用戶程序的Jar包從本地文件系統上傳到HDFS中,見下面的copyFromLocalFile方法:
String originalJarPath = job.getJar(); if (originalJarPath != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())){ job.setJobName(new Path(originalJarPath).getName()); } Path originalJarFile = new Path(originalJarPath); URI jobJarURI = originalJarFile.toUri(); // If the job jar is already in fs, we don't need to copy it from local fs if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null || !(jobJarURI.getScheme().equals(fs.getUri().getScheme()) && jobJarURI.getAuthority().equals( fs.getUri().getAuthority()))) { Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(originalJarFile, submitJarFile); fs.setReplication(submitJarFile, replication); fs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
而jar包所在目錄即Job根目錄,其文件名為:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/files/job.jar,其名字由下面的方法指定:
public static Path getJobJar(Path jobSubmitDir) { return new Path(jobSubmitDir, "job.jar"); }
在HDFS中創建好該文件后,將JobClient本地編譯好的jar文件復制至該文件。注意,libjars里面的文件是需要的其它臨時jar庫文件。
上面創建好的目錄里存放的內容分別為:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/libjars
用於存儲執行job.jar需要的其它jar庫文件。
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/archives
用於存儲任務需要歸檔的一些文件(執行耗時完成時間啊等等各種歸檔信息)
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/files
用於存儲執行程序所需的一些輸入文件,注意不是計算的數據,而是其它一些文件。
到這一步后,實際上還沒有完畢,還有兩個文件需要上傳,job.split和job.xml,分別代表job的文件分割信息和運行參數配置信息,但這兩部分需要客戶端解析分析才能得到,所以放在后面介紹。
4、創建Job配置文件
上面的目錄為作業執行提供了數據基礎,但關於任務數量等等任務配置信息還沒有創建。任務配置信息不完全是用戶直接指定的,而是需要進行一些分析獲得。所有配置信息最終會由客戶端JobClient生成后創建於HDFS的Staging目錄下,即job.xml文件,主要包含任務參數,記錄如Reduce數量等:
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); public static Path getJobConfPath(Path jobSubmitDir) { return new Path(jobSubmitDir, "job.xml"); }
其過程中從用戶指定的配置文件(其實也是xml文件,但是位於本地Linux文件系統)中獲取一些參數,分析得到作業所需參數。主要獲取的參數有:
獲取Reduce數量:
int reduces = jobCopy.getNumReduceTasks();
獲取客戶端IP信息:
InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); }
獲取Map任務數量,這一信息是分析得到的,並不是直接獲取到,獲得的分片信息會以文件名job.split上傳至HDFS:
創建Job參數,這些參數太多,使用JobContext類表示:
JobContext context = new JobContext(jobCopy, jobId);
注意,一般而言,Context中文含義是上下文,在Hadoop里一般用於記錄參數,這里就是記錄Job相關的參數,比如Map的實現類名等等,其聲明為:
public class JobContext { // Put all of the attribute names in here so that Job and JobContext are // consistent. protected static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class"; protected static final String MAP_CLASS_ATTR = "mapreduce.map.class"; protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class"; protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class"; protected static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class"; protected static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class"; protected final org.apache.hadoop.mapred.JobConf conf; protected final Credentials credentials; private JobID jobId; public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String CACHE_FILE_VISIBILITIES = "mapreduce.job.cache.files.visibilities"; public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens"; public static final String USER_LOG_RETAIN_HOURS = "mapred.userlog.retain.hours"; /** * The UserGroupInformation object that has a reference to the current user */ protected UserGroupInformation ugi; 。。。。。。
實際上,JobContext就是封裝了JobConf的一個類,JobConf相當於客戶端最初本地的配置文件信息,被解析后封裝為JobConf對象,而JobContext則可以查詢JobConf得到相應信息,比如返回客戶端的Mapper實現類:
public Class<? extends Mapper<?,?,?,?>> getMapperClass() throws ClassNotFoundException { return (Class<? extends Mapper<?,?,?,?>>) conf.getClass(MAP_CLASS_ATTR, Mapper.class); }
創建了JobContext后,用於檢查輸出目錄的有效性:
// Check the output specification if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); }
之后,分析輸入數據的Split分割信息,用於產生Map的數量:
// Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps);
注意,對輸入數據進行Split,也就決定了Map的數量,可以說,Map的數量是JobClient根據SplitSize產生的,Reduce的數量是用戶指定的,但Map和Reduce具體運行在哪些機器由JobTracker分配。
在上面的代碼中,獲得的分片信息會以文件名job.split上傳至HDFS:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/files/job.split
這一部分的邏輯也有些復雜,我們暫時認為已經產生了job.split文件,在第6節進行詳細分析。
在得到上述信息后,准備將信息寫入Job配置文件Job.xml:
// Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(jobCopy); try { jobCopy.writeXml(out); } finally { out.close(); }
submitJobFile即:
/tmp/hadoop/mapred/staging/esingchan/.staging/job_201408161410_0001/files/job.xml
5、向JobTracker真正提交Job
真正提交Job的代碼為:
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials());
JobProfile prof = jobSubmitClient.getJobProfile(jobId);
jobSubmitClient調用的這個方法同樣利用RPC傳遞到JobTracker執行同名方法,可以看到,JobClient和JobTracker兩者傳遞的內容實際上主要有兩個,一個是Job ID,另一個是該Job在HDFS中的根目錄,具體的資源數據等等實際上全部由JobClient預先放在HDFS中。其代碼為:
JobStatus submitJob(JobID jobId, String jobSubmitDir, UserGroupInformation ugi, Credentials ts, boolean recovered) throws IOException { // Check for safe-mode checkSafeMode(); JobInfo jobInfo = null; if (ugi == null) { ugi = UserGroupInformation.getCurrentUser(); } synchronized (this) { if (jobs.containsKey(jobId)) { // job already running, don't start twice return jobs.get(jobId).getStatus(); } jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()), new Path(jobSubmitDir)); } // Store the job-info in a file so that the job can be recovered // later (if at all) // Note: jobDir & jobInfo are owned by JT user since we are using // his fs object if (!recovered) { Path jobDir = getSystemDirectoryForJob(jobId); FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); jobInfo.write(out); out.close(); } // Create the JobInProgress, do not lock the JobTracker since // we are about to copy job.xml from HDFS and write jobToken file to HDFS JobInProgress job = null; try { if (ts == null) { ts = new Credentials(); } generateAndStoreJobTokens(jobId, ts); job = new JobInProgress(this, this.conf, jobInfo, 0, ts); } catch (Exception e) { throw new IOException(e); } if (recovered && !job.getJobConf().getBoolean( JobConf.MAPREDUCE_RECOVER_JOB, JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) { LOG.info("Job "+ jobId.toString() + " is not enable for recovery, cleaning up job files"); job.cleanupJob(); return null; } synchronized (this) { // check if queue is RUNNING String queue = job.getProfile().getQueueName(); if (!queueManager.isRunning(queue)) { throw new IOException("Queue \"" + queue + "\" is not running"); } try { aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB); } catch (IOException ioe) { LOG.warn("Access denied for user " + job.getJobConf().getUser() + ". Ignoring job " + jobId, ioe); job.fail(); throw ioe; } // Check the job if it cannot run in the cluster because of invalid memory // requirements. try { checkMemoryRequirements(job); } catch (IOException ioe) { throw ioe; } try { this.taskScheduler.checkJobSubmission(job); } catch (IOException ioe){ LOG.error("Problem in submitting job " + jobId, ioe); throw ioe; } // Submit the job JobStatus status; try { status = addJob(jobId, job); } catch (IOException ioe) { LOG.info("Job " + jobId + " submission failed!", ioe); status = job.getStatus(); status.setFailureInfo(StringUtils.stringifyException(ioe)); failJob(job); throw ioe; } return status; } }
主要分為幾個步驟:
5.1 創建JobInProgress對象,該對象是JobTracker用來記錄Job信息的類。其聲明為:
public class JobInProgress { JobProfile profile; JobStatus status; String jobFile = null; Path localJobFile = null; final QueueMetrics queueMetrics; TaskInProgress maps[] = new TaskInProgress[0]; TaskInProgress reduces[] = new TaskInProgress[0]; TaskInProgress cleanup[] = new TaskInProgress[0]; TaskInProgress setup[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; final long memoryPerMap; final long memoryPerReduce; volatile int numSlotsPerMap = 1; volatile int numSlotsPerReduce = 1; final int maxTaskFailuresPerTracker; // Counters to track currently running/finished/failed Map/Reduce task-attempts int runningMapTasks = 0; int runningReduceTasks = 0; int finishedMapTasks = 0; int finishedReduceTasks = 0; int failedMapTasks = 0; int failedReduceTasks = 0; private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L; long reduce_input_limit = -1L; private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; int completedMapsForReduceSlowstart = 0; // runningMapTasks include speculative tasks, so we need to capture // speculative tasks separately int speculativeMapTasks = 0; int speculativeReduceTasks = 0; final int mapFailuresPercent; final int reduceFailuresPercent; int failedMapTIPs = 0; int failedReduceTIPs = 0; private volatile boolean launchedCleanup = false; private volatile boolean launchedSetup = false; private volatile boolean jobKilled = false; private volatile boolean jobFailed = false; JobPriority priority = JobPriority.NORMAL; final JobTracker jobtracker; protected Credentials tokenStorage; // NetworkTopology Node to the set of TIPs Map<Node, List<TaskInProgress>> nonRunningMapCache; // Map of NetworkTopology Node to set of running TIPs Map<Node, Set<TaskInProgress>> runningMapCache; // A list of non-local, non-running maps final List<TaskInProgress> nonLocalMaps; // Set of failed, non-running maps sorted by #failures final SortedSet<TaskInProgress> failedMaps; // A set of non-local running maps Set<TaskInProgress> nonLocalRunningMaps; // A list of non-running reduce TIPs Set<TaskInProgress> nonRunningReduces; // A set of running reduce TIPs Set<TaskInProgress> runningReduces; // A list of cleanup tasks for the map task attempts, to be launched List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>(); // A list of cleanup tasks for the reduce task attempts, to be launched List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>(); 。。。。。。。
創建JobInProgress對象,對該任務進行初始化:
JobInProgress(JobTracker jobtracker, final JobConf default_conf, JobInfo jobInfo, int rCount, Credentials ts) throws IOException, InterruptedException { try { this.restartCount = rCount; this.jobId = JobID.downgrade(jobInfo.getJobID()); String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId; this.jobtracker = jobtracker; this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP); this.status.setUsername(jobInfo.getUser().toString()); this.jobtracker.getInstrumentation().addPrepJob(conf, jobId); // Add the queue-level metric below (after the profile has been initialized) this.startTime = jobtracker.getClock().getTime(); status.setStartTime(startTime); this.localFs = jobtracker.getLocalFileSystem(); this.tokenStorage = ts; // use the user supplied token to add user credentials to the conf jobSubmitDir = jobInfo.getJobSubmitDir(); user = jobInfo.getUser().toString(); userUGI = UserGroupInformation.createRemoteUser(user); if (ts != null) { for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) { userUGI.addToken(token); } } fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return jobSubmitDir.getFileSystem(default_conf); }}); /** check for the size of jobconf **/ Path submitJobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir); FileStatus fstatus = fs.getFileStatus(submitJobFile); if (fstatus.getLen() > jobtracker.MAX_JOBCONF_SIZE) { throw new IOException("Exceeded max jobconf size: " + fstatus.getLen() + " limit: " + jobtracker.MAX_JOBCONF_SIZE); } this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR +"/"+jobId + ".xml"); Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir); jobFile = jobFilePath.toString(); fs.copyToLocalFile(jobFilePath, localJobFile); conf = new JobConf(localJobFile); if (conf.getUser() == null) { this.conf.setUser(user); } if (!conf.getUser().equals(user)) { String desc = "The username " + conf.getUser() + " obtained from the " + "conf doesn't match the username " + user + " the user " + "authenticated as"; AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(), jobId.toString(), desc); throw new IOException(desc); } this.priority = conf.getJobPriority(); this.status.setJobPriority(this.priority); String queueName = conf.getQueueName(); this.profile = new JobProfile(user, jobId, jobFile, url, conf.getJobName(), queueName); Queue queue = this.jobtracker.getQueueManager().getQueue(queueName); if (queue == null) { throw new IOException("Queue \"" + queueName + "\" does not exist"); } this.queueMetrics = queue.getMetrics(); this.queueMetrics.addPrepJob(conf, jobId); this.submitHostName = conf.getJobSubmitHostName(); this.submitHostAddress = conf.getJobSubmitHostAddress(); this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); this.memoryPerMap = conf.getMemoryForMapTask(); this.memoryPerReduce = conf.getMemoryForReduceTask(); this.taskCompletionEvents = new ArrayList<TaskCompletionEvent> (numMapTasks + numReduceTasks + 10); // Construct the jobACLs status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf)); this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); hasSpeculativeMaps = conf.getMapSpeculativeExecution(); hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); // a limit on the input size of the reduce. // we check to see if the estimated input size of // of each reduce is less than this value. If not // we fail the job. A value of -1 just means there is no // limit set. reduce_input_limit = -1L; this.maxLevel = jobtracker.getNumTaskCacheLevels(); this.anyCacheLevel = this.maxLevel+1; this.nonLocalMaps = new LinkedList<TaskInProgress>(); this.failedMaps = new TreeSet<TaskInProgress>(failComparator); this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>(); this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>(); this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator); this.runningReduces = new LinkedHashSet<TaskInProgress>(); this.resourceEstimator = new ResourceEstimator(this); this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", DEFAULT_REDUCE_INPUT_LIMIT); // register job's tokens for renewal DelegationTokenRenewal.registerDelegationTokensForRenewal( jobInfo.getJobID(), ts, jobtracker.getConf()); // Check task limits checkTaskLimits(); } finally { //close all FileSystems that was created above for the current user //At this point, this constructor is called in the context of an RPC, and //hence the "current user" is actually referring to the kerberos //authenticated user (if security is ON). FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); } }
5.2 將創建好的對象丟至任務隊列中:
// Submit the job JobStatus status; try { status = addJob(jobId, job); } catch (IOException ioe) { LOG.info("Job " + jobId + " submission failed!", ioe); status = job.getStatus(); status.setFailureInfo(StringUtils.stringifyException(ioe)); failJob(job); throw ioe; } return status; } }
addJob是JobTracker的一個方法:
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws IOException { totalSubmissions++; synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) { listener.jobAdded(job); } } } myInstrumentation.submitJob(job.getJobConf(), jobId); job.getQueueMetrics().submitJob(job.getJobConf(), jobId); LOG.info("Job " + jobId + " added successfully for user '" + job.getJobConf().getUser() + "' to queue '" + job.getJobConf().getQueueName() + "'"); AuditLogger.logSuccess(job.getUser(), Operation.SUBMIT_JOB.name(), jobId.toString()); return job.getStatus(); }
核心部分就是下面的代碼:
synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) { listener.jobAdded(job); } } }
jobs記錄了JobTracker目前所有的作業:
// All the known jobs. (jobid->JobInProgress) Map<JobID, JobInProgress> jobs = Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
但這個對象並不是作業隊列,真正的隊列是jobInProgressListeners,該對象是一個JobInProgressListener的隊列:
private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
而JobInProgressListener是一個抽象類 ,其實現有很多個,如JobQueueJobInProgressListener用於監控job的運行狀態,EagerTaskInitializationListener用於對Job進行初始化,JobTracker的JobInProgressListener隊列里包含了多個這種類,新到的Job都被加入到各個JobInProgressListener中,以EagerTaskInitializationListener為例:
/** * We add the JIP to the jobInitQueue, which is processed * asynchronously to handle split-computation and build up * the right TaskTracker/Block mapping. */ @Override public void jobAdded(JobInProgress job) { synchronized (jobInitQueue) { jobInitQueue.add(job); resortInitQueue(); jobInitQueue.notifyAll(); } }
可見,Job被加入到隊列中,jobInitQueue是一個List對象:
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
resortInitQueue()方法根據優先級等對隊列里面的作業進行重新排列:
/** * Sort jobs by priority and then by start time. */ private synchronized void resortInitQueue() { Comparator<JobInProgress> comp = new Comparator<JobInProgress>() { public int compare(JobInProgress o1, JobInProgress o2) { int res = o1.getPriority().compareTo(o2.getPriority()); if(res == 0) { if(o1.getStartTime() < o2.getStartTime()) res = -1; else res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1); } return res; } }; synchronized (jobInitQueue) { Collections.sort(jobInitQueue, comp); } }
因此,后加入隊列的作業可能會因為優先級而被調整到前面執行。
jobInitQueue.notifyAll()這一行是通知其它線程,實際上是起到喚醒Job處理線程的作用,因為Job處理線程在沒有Job的時候會wait,這可以從下面代碼中看出:
class JobInitManager implements Runnable { public void run() { JobInProgress job = null; while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info("JobInitManagerThread interrupted."); break; } } LOG.info("Shutting down thread pool"); threadPool.shutdownNow(); } }
當被喚醒后,會執行 job = jobInitQueue.remove(0)獲得隊列中第一個Job,並調用線程池,threadPool用JAVA中標准的線程池類java.util.concurrent.ExecutorService實現。
5.3 Job任務執行
從Job隊列中取出Job任務后,創建了InitJob對象,丟入線程池執行時,則執行run方法:
class InitJob implements Runnable { private JobInProgress job; public InitJob(JobInProgress job) { this.job = job; } public void run() { ttm.initJob(job); } }
於是,進入ttm的intiJob方法,ttm是TaskTrackerManager對象,而TaskTrackerManager是一個接口,實現類只有JobTracker,因此實際上進入JobTracker的initJob方法執行:
public void initJob(JobInProgress job) { 。。。。。。。try { JobStatus prevStatus = (JobStatus)job.getStatus().clone(); LOG.info("Initializing " + job.getJobID()); job.initTasks(); // Inform the listeners if the job state has changed JobStatus newStatus = (JobStatus)job.getStatus().clone(); if (prevStatus.getRunState() != newStatus.getRunState()) { JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); synchronized (JobTracker.this) { updateJobInProgressListeners(event); } } } } 。。。。。。 }
之后,進入JobInProgress的initTasks方法執行作業的初始化。至此為止,JobClient作業提交、JobTracker將作業丟入隊列、額外線程從隊列中取出作業並丟到線程池中執行作業初始化的基本代碼解析完畢。關於Job如何進行初始化,以及任務如何分配等等內容,我們留作后續博文中研究。
另外,上面我們遺留了一個問題,即Map任務的數量的確定。
6、Map數量的確定
在前面JobClient進行作業提交時,涉及到向配置文件job.xml中寫入Map數量的代碼:
// Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps);
writeSplits調用writeNewSplits,該方法最核心的是input.getSplits方法:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; } private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
createSplitFiles方法會生成兩個文件:job.split和job.splitmetainfo:
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits) throws IOException, InterruptedException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); SplitMetaInfo[] info = writeNewSplits(conf, splits, out); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); } public static Path getJobSplitFile(Path jobSubmissionDir) { return new Path(jobSubmissionDir, "job.split"); } public static Path getJobSplitMetaFile(Path jobSubmissionDir) { return new Path(jobSubmissionDir, "job.splitmetainfo"); }
在理解上述代碼時,需要理解Split。
Split代表了對輸入數據的一種邏輯分割,對於每一個分割,最終都會有一個Map任務進行計算,而Reduce的數量是用戶指定的,因此Split決定了MapReduce的計算任務數量。
在所有的InputSplit里,最重要的是FileSplit,代表了對輸入HDFS文件的分割:
public class FileSplit extends InputSplit implements Writable { private Path file; private long start; private long length; private String[] hosts; .......
從FileSplit的定義來看,主要記錄了Split塊所屬文件、在文件中的起始位置,長度等。比如一個100T的大文件,分割成100個FileSplit,每個1T。只有確定了文件的分割方式,才能確定Map的數量。
因為輸入數據可能是文件、HBase表等等,所以針對不同的輸入數據格式,有不同的類實現這一Split分割功能。這里只從文件來看,由FileInputFormat實現分割。其核心方法為getSplits:
/** * Generate the list of files and make them into FileSplits. */ public List<InputSplit> getSplits(JobContext job ) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus>files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } } // Save the number of input files in the job-conf job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
上面的獲取Split的方法中,核心步驟是:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
minSize為最小的Split大小,getFormatMinSplitSize的值是1,getMinSplitSize則來自於配置文件:
/** * Get the lower bound on split size imposed by the format. * @return the number of bytes of the minimal split for this format */ protected long getFormatMinSplitSize() { return 1; } /** * Get the minimum split size * @param job the job * @return the minimum number of bytes that can be in a split */ public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapred.min.split.size", 1L); }
getMaxSplitSize同樣來自於配置文件,默認是Long的最大值:
/** * Get the maximum split size. * @param context the job to look at. * @return the maximum number of bytes a split can include */ public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapred.max.split.size", Long.MAX_VALUE); }
之后,獲取文件信息:
long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
獲得文件的長度,並獲取文件塊所在的位置。BlockLocation記錄了一個塊在哪些DataNode機器上,其定義為:
public class BlockLocation implements Writable { private String[] hosts; //hostnames of datanodes private String[] names; //hostname:portNumber of datanodes private String[] topologyPaths; // full path name in network topology private long offset; //offset of the of the block in the file private long length; 。。。。。。。
long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize);
BlockSize為HDFS的塊大小,默認64MB或128MB,splitSize則為Split的大小,computeSplitSize的實現為:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
可見,就是幾個值選擇一個。默認的,Split的大小就是blockSize,是一個Block的大小,也就是64MB等,當然,也可以設置為其它值。但無論如何設置,從下面的代碼可以看出,Split的大小是一樣的,除了最后一個FileSplit。而由於SPLIT_SLOP = 1.1,最后一個FileSplit是有可能大於一個Block大小的(<1.1個BlockSize即可,默認情況)。
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } //最后一個塊 if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); }
每個FileSplit創建時,其大小都為splitSize,而這個值是通過前面代碼獲取的。從上面的代碼中,blkInde表示某一個Block,而 blkLocations[blkIndex].getHosts()表示該Block所在的機器,這些機器作為FileSplit的參數,因此,應該是暗指Split的大小最好不要過大,比如假定一個Split是2個Block大小,而這個2個Block如果位於兩個服務器上,那么在構造FileSplit時,則最終分配的Map任務只會位於一台服務器上,另一個Block則需要通過網絡傳輸至Map所在機器,這說明,SPlit不宜大,最好保持就默認為一個Block。
從其構造函數來看:
public FileSplit(Path file, long start, long length, String[] hosts) { this.file = file; this.start = start; this.length = length; this.hosts = hosts; }
假如length很大,跨越了很多個Block,則可能跨越多台服務器,但hosts只是記錄了某一個Block所處的服務器。從下面的方法可以看出,給Split分配的服務器就是Split的起始位置所在Block的所在機器(當然按照3份備份來說,至少有3台)。
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
FileInputFormat用於獲取輸入數據的分割,該類繼承於基類InputFormat<K, V>,其定義為:
public abstract class InputFormat<K, V> { /** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} * for processing.</p> * * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the * input files are not physically split into chunks. For e.g. a split could * be <i><input-file-path, start, offset></i> tuple. The InputFormat * also creates the {@link RecordReader} to read the {@link InputSplit}. * * @param context job configuration. * @return an array of {@link InputSplit}s for the job. */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /** * Create a record reader for a given split. The framework will call * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before * the split is used. * @param split the split to be read * @param context the information about the task * @return a new record reader * @throws IOException * @throws InterruptedException */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
里面必須實現兩個方法:getSplits、createRecordReader。因此,getSplits獲得對輸入數據的分割方式,createRecordReader則返回一個可以讀取記錄的類。
從前面可以看出,如果采用默認形式,則Map數量等於Block的數量,一般情況下,Split尺寸就等於Block。這里可能就會有個疑問,某些記錄較長(比如一行文本),可能會跨越多個Block,那么,也就會跨越多個Split,而Map和Split是一一對應關系,跨越邊界的記錄被哪個Map執行呢?這個問題由RecordReader保證,在處理文本文件時,Hadoop提供了一些基本實現,典型的有TextInputFormat,這個類繼承於FileInputFormat<LongWritable, Text> ,其聲明為:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new LineRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } }
可見,其實現了LineRecordReader,讀取整行記錄。如果一行文本跨越了多個Split,則LineRecordReader不會關心SPlit,而會保證跨越的那個文本行由前一個Map任務執行,其代碼為:
public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
其readLine方法會讀取一整行,因為這是針對HDFS操作的,所以不管這一行是否跨越了多個Split,都會把記錄全部讀進來處理。
而在LineRecordReader的initialize方法中,有以下關鍵代碼:
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); .....
start和end記錄了該Split在整個文件中的起始位置和結束位置。並跳到Split的起始位置:
fileIn.seek(start); in = new LineReader(fileIn, job); filePosition = fileIn;
之后,如果start不為0,表明這不是第一個Split塊,則會直接讀取一行,而並不進行處理:
// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;
這樣就相當於忽略了跨越Split邊界第一行,因為這一行已經被上一個Split對應的Map任務處理了。
總的來說,Map任務數量決定於Split數量,一般等於HDFS Block的整數倍,會存在一條記錄跨越多個Split的情況,記錄讀取由RecordReader的實現類決定,在這個類中,會處理好邊界問題。
后記:
Job提交的過程基本剖析完畢,從官網摘出其流程圖:
可以看出,一共有10個基本步驟:
1、創建JobClient對象,實現與JobTracker的RPC訪問;
2、獲取Job ID;
3、向HDFS上傳所需數據文件(不是HDFS文件數據)、資源文件;這些文件屬於Job相關文件,Map任務數量由JobClient負責統計得到,生成后寫入HDFS的job.split文件中;
4、提交Job至JobTracker的隊列;
5、初始化,即從Job隊列中取出來進行初始化操作;
6、獲取job.split等文件,進行任務分配等操作;
7、TaskTracker通過心跳機制,獲取Job;
8、TaskTracker從HDFS中獲取所需jar包、參數等資源信息;
9、啟動虛擬機;
10、Map、Reduce Task在虛擬機中執行。
在本博文中,對1-4的過程進行了分析,關於JobTracker如何取出Job,以及Job如何分配,TaskTracker如何獲取Map/Reduce Task的細節在后續博文中進行分析。
上面過程若有不對之處,敬請指正。