hadoop系列四:mapreduce的使用(二)


轉載請在頁首明顯處注明作者與出處

 

 

一:說明

此為大數據系列的一些博文,有空的話會陸續更新,包含大數據的一些內容,如hadoop,spark,storm,機器學習等。

當前使用的hadoop版本為2.6.4

 

此為mapreducer的第二章節

這一章節中有着 計算共同好友,推薦可能認識的人

 

上一篇:hadoop系列三:mapreduce的使用(一)

 

 

一:說明
二:在開發工具在運行mapreducer
2.1:本地模式運行mapreducer
2.2:在開發工具中運行在yarn中
三:mapreduce實現join
3.1:sql數據庫中的示例
3.2:mapreduce的實現思路
3.3:創建相應的javabean
3.4:創建mapper
3.5:創建reduce
3.6:完整代碼
3.7:數據傾斜的問題
四:查找共同好友,計算可能認識的人
4.1:准備數據
4.2:計算指定用戶是哪些人的好友
4.3:計算共同好友
五:使用GroupingComparator分組計算最大值
5.1:定義一個javabean
5.2:定義一個GroupingComparator
5.3:map代碼
5.4:reduce的代碼
5.5:啟動類
六:自定義輸出位置
6.1:自定義FileOutputFormat
七:自定義輸入數據
八:全局計數器
九:多個job串聯,定義執行順序
十:mapreduce的參數優化
10.1:資源相關參數
10.2:容錯相關參數
10.3:本地運行mapreduce作業
10.4:效率和穩定性相關參數

 

 

 

二:在開發工具在運行mapreducer

之前我們一直是在開發工具中寫好了代碼,然后打包成jar包在服務器中以hadoop jar的形式運行,當然這個極其麻煩,畢竟上傳這個部署還是很麻煩的,其次就是每改一次代碼,都需要重新打包到服務器中。還有一個最大的缺點就是沒有辦法打斷點調試一些業務代碼,這對於定位代碼問題極其困難。這里也有兩個辦法。

 

2.1:本地模式運行mapreducer

何為本地模式,就是不是運行在yarn上面,僅僅是以運行在本地的一個模式。

首先既然是運行在本地,就需要有所有mapreducer的class文件,先在hadoop官網中下載hadoop的代碼,然后編譯成相應的操作系統版本,以筆者在windows中開發的環境,肯定是編譯windows版本的,然后設置相應的環境變量

 

HADOOP_HOME=E:\software\hadoop-2.6.2

 

然后增加path

%HADOOP_HOME%\bin

然后看一下main方法,其實代碼什么都不用改,conf的配置全部可以不寫,直接運行就是本地模式,至於為什么在服務器根據hadoop jar運行時,會運行到jar中,因為hadoop jar命令加載了配置文件。

 

        Configuration conf = new Configuration();
        //這個默認值就是local,其實可以不寫
        conf.set("mapreduce.framework.name", "local");
        //本地模式運行mr程序時,輸入輸出可以在本地,也可以在hdfs中,具體需要看如下的兩行參數
        //這個默認值 就是本地,其實可以不配
        //conf.set("fs.defaultFS","file:///");
        //conf.set("fs.defaultFS","hdfs://server1:9000/");



        Job job = Job.getInstance(conf);

 

那實際上,需要使用本地模式的時候,這里面的配置可以什么都不寫,因為默認的參數就是本地模式,所以這個時候直接運行就行了,當然,在后面我們接收了兩個參數,分別是數據的的來源和存儲位置,所以我們運行的時候的時候,直接提交參數就行了,以idea為例

 

 像在這里就傳了兩個參數,地址就在D盤中。

 

當然,其實也是支持掛在hdfs中的,如下配置

 

        Configuration conf = new Configuration();
        //這個默認值就是local,其實可以不寫
        conf.set("mapreduce.framework.name", "local");
        //本地模式運行mr程序時,輸入輸出可以在本地,也可以在hdfs中,具體需要看如下的兩行參數
        //其實是可以本地模式也可以使用hdfs中的數據的
        //conf.set("fs.defaultFS","file:///");
        conf.set("fs.defaultFS","hdfs://server1:9000/");

也就是說,即使是本地模式,不僅僅可以使用在硬盤中,也可以使用在hdfs中

 

 

其實我們還需要加上一個日志文件,不然等下出錯了,也看不到錯誤信息,僅僅是一片空白,那就尷尬了

 

在src/main/resource中添加一個log4j.properties文件,內容如下

 

log4j.rootLogger=info, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=example.log log4j.appender.R.MaxFileSize=100KB log4j.appender.R.MaxBackupIndex=1 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n

 

打印所有的info信息

 

 

 

 

 

 

2.2:在開發工具中運行在yarn中

上一部分中,我們是運行在本地模式,但是使用開發工具,可以更好的debug,這次我們在開發工具在,運行在服務器中的yarn上面。

想要運行在yarn上面,我們可以進行如下的配置

 

        Configuration conf = new Configuration();
        //運行在yarn的集群模式
        conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","server1");//這行配置,使得該main方法會尋找該機器的mr環境
        conf.set("fs.defaultFS","hdfs://server1:9000/");

 

通過之前的代碼,我們知道我們要設置一個參數,使得mr環境能找到該代碼的jar包,然后復制到所有的mr機器中去運行,但是我們這里要換一種方式,因為開發工具運行的時候,是直接運行class文件,而不是jar包

        Job job = Job.getInstance(conf);
        //使得hadoop可以根據類包,找到jar包在哪里,如果是在開發工具中運行,那么則是找不到的
        //job.setJarByClass(WordCountDriver.class);
        job.setJar("c:/xx.jar");

所以,如果我們要執行如下的代碼,我們還需要先對程序進行打包才行。

僅僅修改完如上的一點代碼,我們開始運行。

同樣的,先配置啟動參數,因為我們沒改別的代碼,mr的輸入與輸出都是從啟動參數中讀取的

 

 

 

 然后執行main方法,如果server1有配置在hosts文中的話,那么見證奇跡.....哦,見證錯誤吧

在這里會看到一個錯誤,啥,沒權限,對的,而且我們看到一個Administrator的用戶,這個其實是我windows系統的用戶,說明mapreduce運行的時候,拿的用戶是當前登陸的用戶,而在服務器中,如果看過之前的文章,我們給的目錄權限是hadoop用戶,所以我們要設置hadoop的用戶。

我們要怎么做呢?還有要怎么設置用戶為hadoop呢?我們來看一段hadoop的核心代碼

if (!isSecurityEnabled() && (user == null)) {
  String envUser = System.getenv(HADOOP_USER_NAME);
  if (envUser == null) {
    envUser = System.getProperty(HADOOP_USER_NAME);
  }
  user = envUser == null ? null : new User(envUser);
}

這段代碼是獲取用戶的代碼,這個時候我們就知道該怎么設置用戶名了,常量名稱為:HADOOP_USER_NAME

 

        System.setProperty("HADOOP_USER_NAME","hadoop");
        Configuration conf = new Configuration();
        //運行在yarn的集群模式
        conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","server1");//這行配置,使得該main方法會尋找該機器的mr環境
        conf.set("fs.defaultFS","hdfs://server1:9000/");

可以看到紅色區域,設置了hadoop的用戶,此時,我們再運行一下代碼,見證下一個錯誤,ps:一定要配置日志文件,不然看不到錯誤信息

從完整的日志中,其實是可以看到,它是運行在yarn中了,不過出錯了,圖中是錯誤信息

有點讓我吃驚的這竟然是中文的日志哈,如果是英文的日志,則是這樣的

 

意思差不多哈,看到這個錯誤,我們要怎么解決呢?

這是hadoop的一個bug,新版本中已經解決,並且這個bug只會在windwos系統中出現,也就是意味着,如果你用的是linux的圖形化界面,在這里面使用開發工具運行,也是不會有問題的。

先看一下問題是怎么產生的吧。先關聯源碼。

我們先找到org.apache.hadoop.mapred.YARNRunner這個類,並且在492行打上注釋,可能位置會不一樣,不過只需要找到environment變量即可,然后查看這個變量的名稱

經過debug后,進入斷點,查看environment變量,把內容最長的一段復制出來到記事本中查看。

很明顯,最后的代碼是執行在linux中的,但是這段環境卻有問題。

首先就是%HADOOP_CONF_DIR%這種環境變量,對linux熟悉的可能知道,linux的環境變量是$JAVA_HOME$的這種形式,這是一個問題。

其次就是斜杠windows與linux也是不同的。

最后,環境變量的相隔,在linux中是冒號,而在windows中是分號。

 

這下應該知道問題了,不過我們要怎么改呢?只能改源代碼了,千萬不要對改源代碼抱有害怕的心里,如果認真想想,這種類型的代碼,就算是一個剛學會java基礎的人也會修改,並沒有什么可怕的。當然,等會也會貼出改完后的完整代碼,不想改的同學直接復制就行了。

 我們復制這樣的一個類,包括代碼,包名都要一樣,直接建立在我們的工程中,java會優先讀取本工程中的類

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
import org.apache.hadoop.yarn.util.ConverterUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;

/**
 * This class enables the current JobClient (0.22 hadoop) to run on YARN.
 */
@SuppressWarnings("unchecked")
public class YARNRunner implements ClientProtocol {

    private static final Log LOG = LogFactory.getLog(YARNRunner.class);

    private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    private ResourceMgrDelegate resMgrDelegate;
    private ClientCache clientCache;
    private Configuration conf;
    private final FileContext defaultFileContext;

    /**
     * Yarn runner incapsulates the client interface of yarn
     * 
     * @param conf
     *            the configuration object for the client
     */
    public YARNRunner(Configuration conf) {
        this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
    }

    /**
     * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
     * {@link ResourceMgrDelegate}. Enables mocking and testing.
     * 
     * @param conf
     *            the configuration object for the client
     * @param resMgrDelegate
     *            the resourcemanager client handle.
     */
    public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
        this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
    }

    /**
     * Similar to
     * {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} but
     * allowing injecting {@link ClientCache}. Enable mocking and testing.
     * 
     * @param conf
     *            the configuration object
     * @param resMgrDelegate
     *            the resource manager delegate
     * @param clientCache
     *            the client cache object.
     */
    public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) {
        this.conf = conf;
        try {
            this.resMgrDelegate = resMgrDelegate;
            this.clientCache = clientCache;
            this.defaultFileContext = FileContext.getFileContext(this.conf);
        } catch (UnsupportedFileSystemException ufe) {
            throw new RuntimeException("Error in instantiating YarnClient", ufe);
        }
    }

    @Private
    /**
     * Used for testing mostly.
     * @param resMgrDelegate the resource manager delegate to set to.
     */
    public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
        this.resMgrDelegate = resMgrDelegate;
    }

    @Override
    public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) throws IOException, InterruptedException {
        throw new UnsupportedOperationException("Use Token.renew instead");
    }

    @Override
    public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
        return resMgrDelegate.getActiveTrackers();
    }

    @Override
    public JobStatus[] getAllJobs() throws IOException, InterruptedException {
        return resMgrDelegate.getAllJobs();
    }

    @Override
    public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
        return resMgrDelegate.getBlacklistedTrackers();
    }

    @Override
    public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
        return resMgrDelegate.getClusterMetrics();
    }

    @VisibleForTesting
    void addHistoryToken(Credentials ts) throws IOException, InterruptedException {
        /* check if we have a hsproxy, if not, no need */
        MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
        if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
            /*
             * note that get delegation token was called. Again this is hack for
             * oozie to make sure we add history server delegation tokens to the
             * credentials
             */
            RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();
            Text service = resMgrDelegate.getRMDelegationTokenService();
            if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
                Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress());
                if (ts.getToken(hsService) == null) {
                    ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
                }
            }
        }
    }

    @VisibleForTesting
    Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) throws IOException, InterruptedException {
        GetDelegationTokenRequest request = recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
        request.setRenewer(Master.getMasterPrincipal(conf));
        org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
        mrDelegationToken = hsProxy.getDelegationToken(request).getDelegationToken();
        return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
    }

    @Override
    public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
        // The token is only used for serialization. So the type information
        // mismatch should be fine.
        return resMgrDelegate.getDelegationToken(renewer);
    }

    @Override
    public String getFilesystemName() throws IOException, InterruptedException {
        return resMgrDelegate.getFilesystemName();
    }

    @Override
    public JobID getNewJobID() throws IOException, InterruptedException {
        return resMgrDelegate.getNewJobID();
    }

    @Override
    public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
        return resMgrDelegate.getQueue(queueName);
    }

    @Override
    public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
        return resMgrDelegate.getQueueAclsForCurrentUser();
    }

    @Override
    public QueueInfo[] getQueues() throws IOException, InterruptedException {
        return resMgrDelegate.getQueues();
    }

    @Override
    public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
        return resMgrDelegate.getRootQueues();
    }

    @Override
    public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException {
        return resMgrDelegate.getChildQueues(parent);
    }

    @Override
    public String getStagingAreaDir() throws IOException, InterruptedException {
        return resMgrDelegate.getStagingAreaDir();
    }

    @Override
    public String getSystemDir() throws IOException, InterruptedException {
        return resMgrDelegate.getSystemDir();
    }

    @Override
    public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
        return resMgrDelegate.getTaskTrackerExpiryInterval();
    }

    @Override
    public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {

        addHistoryToken(ts);

        // Construct necessary information to start the MR AM
        ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts);

        // Submit to ResourceManager
        try {
            ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

            ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
            String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
            if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
                throw new IOException("Failed to run job : " + diagnostics);
            }
            return clientCache.getClient(jobId).getJobStatus(jobId);
        } catch (YarnException e) {
            throw new IOException(e);
        }
    }

    private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException {
        LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
        FileStatus rsrcStat = fs.getFileStatus(p);
        rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
        rsrc.setSize(rsrcStat.getLen());
        rsrc.setTimestamp(rsrcStat.getModificationTime());
        rsrc.setType(type);
        rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
        return rsrc;
    }

    public ApplicationSubmissionContext createApplicationSubmissionContext(Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException {
        ApplicationId applicationId = resMgrDelegate.getApplicationId();

        // Setup resource requirements
        Resource capability = recordFactory.newRecordInstance(Resource.class);
        capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
        capability.setVirtualCores(conf.getInt(MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES));
        LOG.debug("AppMaster capability = " + capability);

        // Setup LocalResources
        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

        Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);

        URL yarnUrlForJobSubmitDir = ConverterUtils.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem().resolvePath(defaultFileContext.makeQualified(new Path(jobSubmitDir))));
        LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir);

        localResources.put(MRJobConfig.JOB_CONF_FILE, createApplicationResource(defaultFileContext, jobConfPath, LocalResourceType.FILE));
        if (jobConf.get(MRJobConfig.JAR) != null) {
            Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
            LocalResource rc = createApplicationResource(FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, LocalResourceType.PATTERN);
            String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
            rc.setPattern(pattern);
            localResources.put(MRJobConfig.JOB_JAR, rc);
        } else {
            // Job jar may be null. For e.g, for pipes, the job jar is the
            // hadoop
            // mapreduce jar itself which is already on the classpath.
            LOG.info("Job jar is not present. " + "Not adding any jar to the list of resources.");
        }

        // TODO gross hack
        for (String s : new String[] { MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO }) {
            localResources.put(MRJobConfig.JOB_SUBMIT_DIR + "/" + s, createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s), LocalResourceType.FILE));
        }

        // Setup security tokens
        DataOutputBuffer dob = new DataOutputBuffer();
        ts.writeTokenStorageToStream(dob);
        ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

        // Setup the command to run the AM
        List<String> vargs = new ArrayList<String>(8);
        // vargs.add(MRApps.crossPlatformifyMREnv(jobConf,
        // Environment.JAVA_HOME)
        // + "/bin/java");
        // TODO   此處為修改處
        System.out.println(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java");
        vargs.add("$JAVA_HOME/bin/java");

        // TODO: why do we use 'conf' some places and 'jobConf' others?
        long logSize = jobConf.getLong(MRJobConfig.MR_AM_LOG_KB, MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10;
        String logLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
        int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
        MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);

        // Check for Java Lib Path usage in MAP and REDUCE configs
        warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""), "map", MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
        warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""), "map", MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
        warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""), "reduce", MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
        warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""), "reduce", MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);

        // Add AM admin command opts before user command opts
        // so that it can be overridden by user
        String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
        warnForJavaLibPath(mrAppMasterAdminOptions, "app master", MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
        vargs.add(mrAppMasterAdminOptions);

        // Add AM user command opts
        String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
        warnForJavaLibPath(mrAppMasterUserOptions, "app master", MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
        vargs.add(mrAppMasterUserOptions);

        if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE, MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
            final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
            if (profileParams != null) {
                vargs.add(String.format(profileParams, ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + TaskLog.LogName.PROFILE));
            }
        }

        vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT);
        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR);

        Vector<String> vargsFinal = new Vector<String>(8);
        // Final command
        StringBuilder mergedCommand = new StringBuilder();
        for (CharSequence str : vargs) {
            mergedCommand.append(str).append(" ");
        }
        vargsFinal.add(mergedCommand.toString());

        LOG.debug("Command to launch container for ApplicationMaster is : " + mergedCommand);

        // Setup the CLASSPATH in environment
        // i.e. add { Hadoop jars, job jar, CWD } to classpath.
        Map<String, String> environment = new HashMap<String, String>();
        MRApps.setClasspath(environment, conf);

        // Shell
        environment.put(Environment.SHELL.name(), conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));

        // Add the container working directory at the front of LD_LIBRARY_PATH
        MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);

        // Setup the environment variables for Admin first
        MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), conf);
        // Setup the environment variables (LD_LIBRARY_PATH, etc)
        MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV), conf);

        // Parse distributed cache
        MRApps.setupDistributedCache(jobConf, localResources);

        Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(2);
        acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
        acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));

        // TODO BY DHT
        for (String key : environment.keySet()) {
            String org = environment.get(key);
            String linux = getLinux(org);
            environment.put(key, linux);
        }
        // Setup ContainerLaunchContext for AM container
        ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(localResources, environment, vargsFinal, null, securityTokens, acls);

        Collection<String> tagsFromConf = jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

        // Set up the ApplicationSubmissionContext
        ApplicationSubmissionContext appContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
        appContext.setApplicationId(applicationId); // ApplicationId
        appContext.setQueue( // Queue name
                jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME));
        // add reservationID if present
        ReservationId reservationID = null;
        try {
            reservationID = ReservationId.parseReservationId(jobConf.get(JobContext.RESERVATION_ID));
        } catch (NumberFormatException e) {
            // throw exception as reservationid as is invalid
            String errMsg = "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID) + " specified for the app: " + applicationId;
            LOG.warn(errMsg);
            throw new IOException(errMsg);
        }
        if (reservationID != null) {
            appContext.setReservationID(reservationID);
            LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId + " to queue:" + appContext.getQueue() + " with reservationId:" + appContext.getReservationID());
        }
        appContext.setApplicationName( // Job name
                jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME));
        appContext.setCancelTokensWhenComplete(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
        appContext.setAMContainerSpec(amContainer); // AM Container
        appContext.setMaxAppAttempts(conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
        appContext.setResource(capability);
        appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
        if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
            appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
        }

        return appContext;
    }

    /**
     * 此處為修改處
     * @param org
     * @return
     */
    private String getLinux(String org) {
        StringBuilder sb = new StringBuilder();
        int c = 0;
        for (int i = 0; i < org.length(); i++) {
            if (org.charAt(i) == '%') {
                c++;
                if (c % 2 == 1) {
                    sb.append("$");
                }
            } else {
                switch (org.charAt(i)) {
                case ';':
                    sb.append(":");
                    break;

                case '\\':
                    sb.append("/");
                    break;
                default:
                    sb.append(org.charAt(i));
                    break;
                }
            }
        }
        return (sb.toString());
    }

    @Override
    public void setJobPriority(JobID arg0, String arg1) throws IOException, InterruptedException {
        resMgrDelegate.setJobPriority(arg0, arg1);
    }

    @Override
    public long getProtocolVersion(String arg0, long arg1) throws IOException {
        return resMgrDelegate.getProtocolVersion(arg0, arg1);
    }

    @Override
    public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0) throws IOException, InterruptedException {
        throw new UnsupportedOperationException("Use Token.renew instead");
    }

    @Override
    public Counters getJobCounters(JobID arg0) throws IOException, InterruptedException {
        return clientCache.getClient(arg0).getJobCounters(arg0);
    }

    @Override
    public String getJobHistoryDir() throws IOException, InterruptedException {
        return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    }

    @Override
    public JobStatus getJobStatus(JobID jobID) throws IOException, InterruptedException {
        JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
        return status;
    }

    @Override
    public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) throws IOException, InterruptedException {
        return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
    }

    @Override
    public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException, InterruptedException {
        return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
    }

    @Override
    public TaskReport[] getTaskReports(JobID jobID, TaskType taskType) throws IOException, InterruptedException {
        return clientCache.getClient(jobID).getTaskReports(jobID, taskType);
    }

    private void killUnFinishedApplication(ApplicationId appId) throws IOException {
        ApplicationReport application = null;
        try {
            application = resMgrDelegate.getApplicationReport(appId);
        } catch (YarnException e) {
            throw new IOException(e);
        }
        if (application.getYarnApplicationState() == YarnApplicationState.FINISHED || application.getYarnApplicationState() == YarnApplicationState.FAILED || application.getYarnApplicationState() == YarnApplicationState.KILLED) {
            return;
        }
        killApplication(appId);
    }

    private void killApplication(ApplicationId appId) throws IOException {
        try {
            resMgrDelegate.killApplication(appId);
        } catch (YarnException e) {
            throw new IOException(e);
        }
    }

    private boolean isJobInTerminalState(JobStatus status) {
        return status.getState() == JobStatus.State.KILLED || status.getState() == JobStatus.State.FAILED || status.getState() == JobStatus.State.SUCCEEDED;
    }

    @Override
    public void killJob(JobID arg0) throws IOException, InterruptedException {
        /* check if the status is not running, if not send kill to RM */
        JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
        ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();

        // get status from RM and return
        if (status == null) {
            killUnFinishedApplication(appId);
            return;
        }

        if (status.getState() != JobStatus.State.RUNNING) {
            killApplication(appId);
            return;
        }

        try {
            /* send a kill to the AM */
            clientCache.getClient(arg0).killJob(arg0);
            long currentTimeMillis = System.currentTimeMillis();
            long timeKillIssued = currentTimeMillis;
            while ((currentTimeMillis < timeKillIssued + 10000L) && !isJobInTerminalState(status)) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException ie) {
                    /** interrupted, just break */
                    break;
                }
                currentTimeMillis = System.currentTimeMillis();
                status = clientCache.getClient(arg0).getJobStatus(arg0);
                if (status == null) {
                    killUnFinishedApplication(appId);
                    return;
                }
            }
        } catch (IOException io) {
            LOG.debug("Error when checking for application status", io);
        }
        if (status != null && !isJobInTerminalState(status)) {
            killApplication(appId);
        }
    }

    @Override
    public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException, InterruptedException {
        return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
    }

    @Override
    public AccessControlList getQueueAdmins(String arg0) throws IOException {
        return new AccessControlList("*");
    }

    @Override
    public JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
        return JobTrackerStatus.RUNNING;
    }

    @Override
    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
        return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
    }

    @Override
    public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException {
        return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
    }

    private static void warnForJavaLibPath(String opts, String component, String javaConf, String envConf) {
        if (opts != null && opts.contains("-Djava.library.path")) {
            LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " + "programs to no longer function if hadoop native libraries " + "are used. These values should be set as part of the " + "LD_LIBRARY_PATH in the " + component + " JVM env using " + envConf
                    + " config settings.");
        }
    }
}

 

代碼就是這樣子,重新運行main方法,就會發現,已經是運行成功了,第一次這樣運行會有點慢,也不會太慢,第二次就正常了。  

最后補充一些東西,其實conf的幾行參數,也可以不寫

 

        conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","server1");//這行配置,使得該main方法會尋找該機器的mr環境
        conf.set("fs.defaultFS","hdfs://server1:9000/");

 

也就是這幾行參數,其實是可以注釋掉的。注釋掉后會去讀取配置文件,我們從服務器中把下面的幾個配置文件下載下來

 

 這里面的配置,是服務器中已經配置好的配置,再把它放到src/main/resource中,打包的時候,就會加載到classpath中,

 

如圖,配置文件中也有着這些配置,所以如果不寫conf參數,把配置文件放進去,也是可以的

 

 

 

 

 

三:mapreduce實現join

點我查看源碼

3.1:sql數據庫中的示例

先列舉說明一下,以關系弄數據庫來說明,假定我們現在有這樣兩個表,訂單表和產品表。

訂單表

 

訂單Id,時間,產品編號,出售數量
1001,20170822,p1,3
1002,20170823,p2,9
1003,20170824,p3,11

 

產品表

#產品編號,產品名稱,種類,單價
p1,防空火箭,1,20.2
p2,迫擊炮,1,50
p3,法師塔,2,100

如果是用關系形數據庫的SQL來表達,將會是如下的SQL

select * from 訂單表 a left join 產品表 b on a.產品編號=b.產品編號

 

 

 

 

3.2:mapreduce的實現思路

首先找到鏈接的字符串,就是產品編號,可以看到,無論是訂單表,還是產品表,都有個訂單編號,sql中是根據這個關聯,我們在mapreduce中也需要根據它來關聯。

實現思路就是把產品編號,作為key當成reduce的輸入。

這個時候,reduce中,全部是同一個產品的數據,其中有多個訂單表的數據,這些訂單是對應着同一個產品,也會有一條產品的表數據,然后把這些數據綜合起來就行。

 

 

 

 

3.3:創建相應的javabean

以上是在sql數據庫中的寫法,假定我們有多個文件存在於hdfs中,我們要關聯其中的數據,而數據格式就是這樣的一個格式,我們要怎么處理呢?它就是mapreduce的一個join寫法,我們這次使用本地模式運行。

首先在創建D:\mr\join\input目錄,創建兩個文件,分別為order_01.txt和product_01.txt里面分別把上面的訂單數據和產品數據存放進去。

 

 

然后我們定義一個javabean,來存放這些信息,並且讓其實現hadoop的序列化

 

    /**
     * 這個類的信息,包含了兩個表的信息記錄
     */
    static class Info implements Writable,Cloneable{
        /**
         * 訂單號
         */
        private int orderId;
        /**
         * 時間
         */
        private String dateString;
        /**
         * 產品編號
         */
        private String pid;
        /**
         * 數量
         */
        private int amount;
        /**
         * 產品名稱
         */
        private String pname;
        /**
         * 種類
         */
        private int categoryId;
        /**
         * 價格
         */
        private float price;
        /**
         * 這個字段需要理解<br>
         * 因為這個對象,包含了訂單與產品的兩個文件的內容,當我們加載一個文件的時候,肯定只能加載一部分的信息,另一部分是加載不到的,需要在join的時候,加進去,這個字段就代表着這個對象存的是哪些信息
         * 如果為0  則是存了訂單信息
         * 如果為1 則是存了產品信息
         */
        private String flag;

        @Override
        protected Object clone() throws CloneNotSupportedException {
            return super.clone();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(orderId);
            out.writeUTF(dateString);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeInt(categoryId);
            out.writeFloat(price);
            out.writeUTF(flag);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            orderId = in.readInt();
            dateString = in.readUTF();
            pid = in.readUTF();
            amount = in.readInt();
            pname = in.readUTF();
            categoryId = in.readInt();
            price = in.readFloat();
            flag = in.readUTF();
        }

        public Info() {
        }

        public void set(int orderId, String dateString, String pid, int amount, String pname, int categoryId, float price,String flag) {
            this.orderId = orderId;
            this.dateString = dateString;
            this.pid = pid;
            this.amount = amount;
            this.pname = pname;
            this.categoryId = categoryId;
            this.price = price;
            this.flag = flag;
        }

        public int getOrderId() {
            return orderId;
        }

        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }

        public String getDateString() {
            return dateString;
        }

        public String getFlag() {
            return flag;
        }

        public void setFlag(String flag) {
            this.flag = flag;
        }

        public void setDateString(String dateString) {
            this.dateString = dateString;
        }

        public String getPid() {
            return pid;
        }

        public void setPid(String pid) {
            this.pid = pid;
        }

        public int getAmount() {
            return amount;
        }

        public void setAmount(int amount) {
            this.amount = amount;
        }

        public String getPname() {
            return pname;
        }

        public void setPname(String pname) {
            this.pname = pname;
        }

        public int getCategoryId() {
            return categoryId;
        }

        public void setCategoryId(int categoryId) {
            this.categoryId = categoryId;
        }

        public float getPrice() {
            return price;
        }

        public void setPrice(float price) {
            this.price = price;
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("{");
            sb.append("\"orderId\":")
                    .append(orderId);
            sb.append(",\"dateString\":\"")
                    .append(dateString).append('\"');
            sb.append(",\"pid\":")
                    .append(pid);
            sb.append(",\"amount\":")
                    .append(amount);
            sb.append(",\"pname\":\"")
                    .append(pname).append('\"');
            sb.append(",\"categoryId\":")
                    .append(categoryId);
            sb.append(",\"price\":")
                    .append(price);
            sb.append(",\"flag\":\"")
                    .append(flag).append('\"');
            sb.append('}');
            return sb.toString();
        }
    }

 

 

 

 

3.4:創建mapper

mapper的代碼可以直接看注釋

    static class JoinMapper extends Mapper<LongWritable,Text,Text,Info>{
        private Info info = new Info();
        private Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if(line.startsWith("#")){//跳轉帶#的注釋
                return;
            }
            //獲取當前任務的輸入切片,這個InputSplit是一個最上層抽象類,可以轉換成FileSplit
            InputSplit inputSplit = context.getInputSplit();
            FileSplit fileSplit = (FileSplit) inputSplit;
            String name = fileSplit.getPath().getName();//得到的是文件名,這里根據文件名來判斷是哪一種類型的數據
            //我們這里通過文件名判斷是哪種數據
            String pid = "";
            String[] split = line.split(",");
            if(name.startsWith("order")){//加載訂單內容,訂單數據里面有 訂單號,時間,產品ID,數量
                //orderId,date,pid,amount
                pid = split[2];
                info.set(Integer.parseInt(split[0]),split[1],pid,Integer.parseInt(split[3]),"",0,0,"0");

            }else{//加載產品內容,產品數據有 產品編號,產品名稱,種類,價格
                //pid,pname,categoryId,price
                pid = split[0];
                info.set(0,"",pid,0,split[1],Integer.parseInt(split[2]),Float.parseFloat(split[3]),"1");
            }
            text.set(pid);
            context.write(text,info);
        }
    }

 

 

 

 

3.5:創建reduce

直接看注釋即可

 

    static class JoinReduce extends Reducer<Text,Info,Info,NullWritable>{

        @Override
        protected void reduce(Text key, Iterable<Info> values, Context context) throws IOException, InterruptedException {
            Info product = new Info();//這個對象用來存放產品的數據,一個產品所以只有一個對象
            List<Info> infos = new ArrayList<>();//這個list用來存放所有的訂單數據,訂單肯定是有多個的
            for(Info info : values){
                if("1".equals(info.getFlag())){
                    //產品表的數據
                    try {
                        product = (Info) info.clone();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else{//代表着是訂單表的數據
                    Info order = new Info();
                    try {
                        order = (Info) info.clone();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    infos.add(order);
                }
            }
            //經過上面的操作,就把訂單與產品完全分離出來了,訂單在list集合中,產品在單獨的一個對象中
            //然后可以分別綜合設置進去
            for(Info tmp : infos){
                tmp.setPname(product.getPname());
                tmp.setCategoryId(product.getCategoryId());
                tmp.setPrice(product.getPrice());
                //最后進行輸出,就會得到結果文件                
                context.write(tmp,NullWritable.get());
            }
        }
    }

 

 

 

 

3.6:完整代碼

上面貼了map與reduce,就差啟動的main方法了,不過main方法是普通的main方法,和上一篇文中的啟動方法一樣,這里直接把join的所有代碼全部貼了出來,包含main方法,全部寫在一個文件里面

package com.zxj.hadoop.demo.mapreduce.join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author 朱小傑
 * 時間 2017-08-22 .22:10
 * 說明 ...
 */
public class MRJoin {
    /**
     * 這個類的信息,包含了兩個表的信息記錄
     */
    static class Info implements Writable,Cloneable{
        /**
         * 訂單號
         */
        private int orderId;
        /**
         * 時間
         */
        private String dateString;
        /**
         * 產品編號
         */
        private String pid;
        /**
         * 數量
         */
        private int amount;
        /**
         * 產品名稱
         */
        private String pname;
        /**
         * 種類
         */
        private int categoryId;
        /**
         * 價格
         */
        private float price;
        /**
         * 這個字段需要理解<br>
         * 因為這個對象,包含了訂單與產品的兩個文件的內容,當我們加載一個文件的時候,肯定只能加載一部分的信息,另一部分是加載不到的,需要在join的時候,加進去,這個字段就代表着這個對象存的是哪些信息
         * 如果為0  則是存了訂單信息
         * 如果為1 則是存了產品信息
         */
        private String flag;

        @Override
        protected Object clone() throws CloneNotSupportedException {
            return super.clone();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(orderId);
            out.writeUTF(dateString);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeInt(categoryId);
            out.writeFloat(price);
            out.writeUTF(flag);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            orderId = in.readInt();
            dateString = in.readUTF();
            pid = in.readUTF();
            amount = in.readInt();
            pname = in.readUTF();
            categoryId = in.readInt();
            price = in.readFloat();
            flag = in.readUTF();
        }

        public Info() {
        }

        public void set(int orderId, String dateString, String pid, int amount, String pname, int categoryId, float price,String flag) {
            this.orderId = orderId;
            this.dateString = dateString;
            this.pid = pid;
            this.amount = amount;
            this.pname = pname;
            this.categoryId = categoryId;
            this.price = price;
            this.flag = flag;
        }

        public int getOrderId() {
            return orderId;
        }

        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }

        public String getDateString() {
            return dateString;
        }

        public String getFlag() {
            return flag;
        }

        public void setFlag(String flag) {
            this.flag = flag;
        }

        public void setDateString(String dateString) {
            this.dateString = dateString;
        }

        public String getPid() {
            return pid;
        }

        public void setPid(String pid) {
            this.pid = pid;
        }

        public int getAmount() {
            return amount;
        }

        public void setAmount(int amount) {
            this.amount = amount;
        }

        public String getPname() {
            return pname;
        }

        public void setPname(String pname) {
            this.pname = pname;
        }

        public int getCategoryId() {
            return categoryId;
        }

        public void setCategoryId(int categoryId) {
            this.categoryId = categoryId;
        }

        public float getPrice() {
            return price;
        }

        public void setPrice(float price) {
            this.price = price;
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("{");
            sb.append("\"orderId\":")
                    .append(orderId);
            sb.append(",\"dateString\":\"")
                    .append(dateString).append('\"');
            sb.append(",\"pid\":")
                    .append(pid);
            sb.append(",\"amount\":")
                    .append(amount);
            sb.append(",\"pname\":\"")
                    .append(pname).append('\"');
            sb.append(",\"categoryId\":")
                    .append(categoryId);
            sb.append(",\"price\":")
                    .append(price);
            sb.append(",\"flag\":\"")
                    .append(flag).append('\"');
            sb.append('}');
            return sb.toString();
        }
    }

    static class JoinMapper extends Mapper<LongWritable,Text,Text,Info>{
        private Info info = new Info();
        private Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if(line.startsWith("#")){//跳轉帶#的注釋
                return;
            }
            //獲取當前任務的輸入切片,這個InputSplit是一個最上層抽象類,可以轉換成FileSplit
            InputSplit inputSplit = context.getInputSplit();
            FileSplit fileSplit = (FileSplit) inputSplit;
            String name = fileSplit.getPath().getName();//得到的是文件名,這里根據文件名來判斷是哪一種類型的數據
            //我們這里通過文件名判斷是哪種數據
            String pid = "";
            String[] split = line.split(",");
            if(name.startsWith("order")){//加載訂單內容,訂單數據里面有 訂單號,時間,產品ID,數量
                //orderId,date,pid,amount
                pid = split[2];
                info.set(Integer.parseInt(split[0]),split[1],pid,Integer.parseInt(split[3]),"",0,0,"0");

            }else{//加載產品內容,產品數據有 產品編號,產品名稱,種類,價格
                //pid,pname,categoryId,price
                pid = split[0];
                info.set(0,"",pid,0,split[1],Integer.parseInt(split[2]),Float.parseFloat(split[3]),"1");
            }
            text.set(pid);
            context.write(text,info);
        }
    }

    static class JoinReduce extends Reducer<Text,Info,Info,NullWritable>{

        @Override
        protected void reduce(Text key, Iterable<Info> values, Context context) throws IOException, InterruptedException {
            Info product = new Info();//這個對象用來存放產品的數據,一個產品所以只有一個對象
            List<Info> infos = new ArrayList<>();//這個list用來存放所有的訂單數據,訂單肯定是有多個的
            for(Info info : values){
                if("1".equals(info.getFlag())){
                    //產品表的數據
                    try {
                        product = (Info) info.clone();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else{//代表着是訂單表的數據
                    Info order = new Info();
                    try {
                        order = (Info) info.clone();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    infos.add(order);
                }
            }
            //經過上面的操作,就把訂單與產品完全分離出來了,訂單在list集合中,產品在單獨的一個對象中
            //然后可以分別綜合設置進去
            for(Info tmp : infos){
                tmp.setPname(product.getPname());
                tmp.setCategoryId(product.getCategoryId());
                tmp.setPrice(product.getPrice());
                //最后進行輸出,就會得到結果文件
                context.write(tmp,NullWritable.get());
            }
        }
    }


    static class JoinMain{
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);

            job.setJarByClass(JoinMain.class);

            job.setMapperClass(JoinMapper.class);
            job.setReducerClass(JoinReduce.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Info.class);

            job.setOutputKeyClass(Info.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));

            boolean b = job.waitForCompletion(true);
            if(b){
                System.out.println("OK");
            }

        }
    }



}

最后配置啟動參數,以本地開發模式運行

運行成功后,得到如下結果

 

 

這就完成了

 

 

 

 

3.7:數據傾斜的問題

上面我們雖然解決了join的問題,但是也會陷入另一個問題,那就是數據傾斜。

假如果說a產品有10萬張訂單,b產品只有10個訂單,那么就會導致每個reduce分配的數據不一致,個別速度很快,個別速度很慢,達不到快速的效果,性能低下。

解決這個問題,就是在map端實現數據的合並,在每個map中,單獨加載產品表的信息,因為產品表的數據,肯定相對小一些,然后在map中實現數據的合並。

 

 

 

 

 

四:查找共同好友,計算可能認識的人

點我下載源碼

假定我們現在有一個社交軟件,它的好友是單向好友,我們現在要計算用戶之間的共同好友,然后向它推薦可能認識的人。

它需要經過兩次mapreducer

 

4.1:准備數據

 

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

 

如上,冒號前面的是用戶,冒號后面的是好友列表。

然后保存為文件,作為第一次mapreduce的輸入

 

 

4.2:計算指定用戶是哪些人的好友

package com.zxj.hadoop.demo.mapreduce.findfriend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author 朱小傑
 * 時間 2017-08-24 .22:59
 * 說明 先算出某個用戶是哪些人的好友
 */
public class Friend1 {


    static class FriendMapper1 extends Mapper<LongWritable, Text, Text, Text> {
        private Text k = new Text();
        private Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] personFriends = line.split(":");
            String person = personFriends[0];//用戶
            String friends = personFriends[1];//好友
            for (String friend : friends.split(",")) {
                //輸出<好友,人>
                k.set(friend);
                v.set(person);
                context.write(k,v);
            }
        }
    }

    /**
     * 輸入 好友,用戶
     */
    static class FriendReduce1 extends Reducer<Text,Text,Text,Text>{
        private Text k = new Text();
        private Text v = new Text();
        @Override
        protected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for(Text person : persons){
                sb.append(person).append(",");
            }
            k.set(friend);
            v.set(sb.toString());
            context.write(k,v);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String input = "D:\\mr\\qq\\input";
        String output = "D:\\mr\\qq\\out1";
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(Friend1.class);

        job.setMapperClass(FriendMapper1.class);
        job.setReducerClass(FriendReduce1.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean b = job.waitForCompletion(true);
        if(b){}

    }
}

這里計算后的結果就是,某個用戶分別是哪些人的好友,得到結果如下

 

 

 

 

4.3:計算共同好友

package com.zxj.hadoop.demo.mapreduce.findfriend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Arrays;

/**
 * @Author 朱小傑
 * 時間 2017-08-24 .22:59
 * 說明 繼續第第二步操作
 */
public class Friend2 {


    static class FriendMapper2 extends Mapper<LongWritable, Text, Text, Text> {
        /**
         * 這里拿到的是上一次計算的數據  A    I,K,C,B,G,F,H,O,D,
         * A是哪些用戶的好友
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split("\t");
            String friend = split[0];
            String[] persions = split[1].split(",");
            Arrays.sort(persions);

            for(int i = 0 ; i < persions.length -2 ; i ++){
                for(int j = i+1 ; j < persions.length -1 ; j ++){
                    //發送出 人-人  好友的數據,就是這兩個人有哪個共同好友,會進入到同一個reducer中
                    context.write(new Text(persions[i] + "-" + persions[j]),new Text(friend));
                }
            }
        }
    }

    /**
     * 輸入 好友,用戶
     */
    static class FriendReduce2 extends Reducer<Text,Text,Text,Text>{
        private Text k = new Text();
        private Text v = new Text();
        @Override
        protected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for(Text f : friends){
                sb.append(f.toString()).append(" ");
            }
            context.write(person_person,new Text(sb.toString()));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String input = "D:\\mr\\qq\\out1";
        String output = "D:\\mr\\qq\\out2";
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(Friend2.class);

        job.setMapperClass(FriendMapper2.class);
        job.setReducerClass(FriendReduce2.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean b = job.waitForCompletion(true);
        if(b){}

    }
}

經過這次計算,就能得到共同的好友了,因為是共同好友,所以他們也是有可能認識的人。

 

 

 

 

 

 

五:使用GroupingComparator分組計算最大值

點我下載源碼

我們准備一些訂單數據

1號訂單,200
1號訂單,300
2號訂單,1000
2號訂單,300
2號訂單,900
3號訂單,9000
3號訂單,200
3號訂單,1000

這是每一號訂單,分別售出多少錢,這里要求計算出每一號訂單中的最大金額。

 

 

5.1:定義一個javabean

定義一個bean,並且實現序列化與排序比較接口

 

package com.zxj.hadoop.demo.mapreduce.groupingcomporator;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/**
 *
 *
 */
public class OrderBean implements WritableComparable<OrderBean>{

    private Text itemid;
    private DoubleWritable amount;

    public OrderBean() {
    }

    public OrderBean(Text itemid, DoubleWritable amount) {
        set(itemid, amount);

    }

    public void set(Text itemid, DoubleWritable amount) {

        this.itemid = itemid;
        this.amount = amount;

    }



    public Text getItemid() {
        return itemid;
    }

    public DoubleWritable getAmount() {
        return amount;
    }



    @Override
    public int compareTo(OrderBean o) {
        int cmp = this.itemid.compareTo(o.getItemid());
        if (cmp == 0) {
            cmp = -this.amount.compareTo(o.getAmount());
        }
        return cmp;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(itemid.toString());
        out.writeDouble(amount.get());
        
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        String readUTF = in.readUTF();
        double readDouble = in.readDouble();
        
        this.itemid = new Text(readUTF);
        this.amount= new DoubleWritable(readDouble);
    }


    @Override
    public String toString() {

        return itemid.toString() + "\t" + amount.get();
        
    }

}

 

 

 

 

5.2:定義一個GroupingComparator

我們都知道,reducer中,是把同一個key,以其所有的value放到了同一個reudce中計算,如果我們要把一個有着多屬性的javabean當作key,那么同一個訂單的bean就無法進入到同一個reduce中,我們需要通過這個分組,讓所有同一個訂單的bean全部進到同一個reduce中。

package com.zxj.hadoop.demo.mapreduce.groupingcomporator;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @Author 朱小傑
 * 時間 2017-08-26 .17:31
 * 說明 利用reduce端的GroupingComparator來實現將一組bean看成相同的key
 * 用來分組
 * @author
 */
public class ItemidGroupingComparator extends WritableComparator {

    /**
     * 這個類必須寫,因為mapreduce需要知道反射成為哪個類
     */
    protected ItemidGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean b1 = (OrderBean) a;
        OrderBean b2 = (OrderBean) b;
        //比較兩個bean時,只比較這里面的一個字段,如果這里是相等的,那么mapreduce就會認為這兩個對象是同一個key
        return b1.getItemid().compareTo(b2.getItemid());
    }
}

我們也知道,mapredce是根據key來進行排序的,所以我們可以想象,在把同一個訂單的所有的bean當作一個key時,一個訂單,只會有一個數據進入到reduce中,而因為我們實現的排序接口,數據最大的會最先進入到reduce中。

 

 

 

5.3:map代碼

 

map的代碼很簡單

 

    static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        
        OrderBean bean = new OrderBean();
        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = StringUtils.split(line, ",");
            
            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
            
            context.write(bean, NullWritable.get());
            
        }
        
    }

 

這里很直接的把一個bean和一個null輸出

 

 

 

5.4:reduce的代碼

    static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
        
        
        //到達reduce時,相同id的所有bean已經被看成一組,且金額最大的那個一排在第一位,所以后面的key也就不存在了
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

因為前面有解釋到,一個訂單,只會有一個bean進來,並且進來的這個bean,肯定是最大值的一個金額,所以我們直接輸出就行了

 

 

 

5.5:啟動類

 

啟動類和以往有點不同

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(SecondarySort.class);
        
        job.setMapperClass(SecondarySortMapper.class);
        job.setReducerClass(SecondarySortReducer.class);
        
        
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path("D:\\mr\\groupcompatrator\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\mr\\groupcompatrator\\out1"));
        
        //在此設置自定義的Groupingcomparator類 
        job.setGroupingComparatorClass(ItemidGroupingComparator.class);
        
        job.waitForCompletion(true);
        
    }

運行之后查看效果如下

 

 

 

 

 

六:自定義輸出位置

 點我下載源碼

之前我們保存數據一直都是保存在文件系統中的,而且都是mapreduce代勞的,我們有沒有可能把它輸出到其它地方呢,比如關系型數據庫,或者輸出到緩存?hive等等這些地方?答案是可以的。

 

6.1:自定義FileOutputFormat

我們之前的啟動類main方法中,一直有一行代碼是這樣子的

 

FileOutputFormat.setOutputPath(job, new Path("D:\\mr\\wordcount\\out1"));

 

這行代碼是指定輸出的位置,可以猜一下,我們使用的應該是FileOutputFormat或者是它的子類,答案是對的。所以我們來繼承它,它是一個抽象類

package com.zxj.hadoop.demo.mapreduce.outputformat;

import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;

/**
 * @Author 朱小傑
 * 時間 2017-08-26 .19:08
 * 說明 mapreduce寫數據時,會先調用這個類的getRecordWriter()方法,拿到一個RecordWriter對象,再調這個對象的寫數據方法
 */
public class MyOutputFormat<Text, LongWritable> extends FileOutputFormat<Text, LongWritable> {
    @Override
    public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MyRecordWriter<>();
    }

    /**
     * 自定義的RecordWriter
     *
     * @param <Text>
     */
    static class MyRecordWriter<Text, LongWritable> extends RecordWriter<Text, LongWritable> {
        private BufferedWriter writer;
        public MyRecordWriter() {
            try {
                writer = new BufferedWriter(new FileWriter("d:/myFileFormat"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void write(Text key, LongWritable value) throws IOException, InterruptedException {
            writer.write(key.toString() + " " + value.toString());
            writer.newLine();
            writer.flush();
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            writer.close();
        }
    }
}

如上的代碼中,我們自定義了一個OutputFormat,並且把文件輸出到了D盤,可以想象,假如說我們要輸出到一些關系型數據庫,或者一些緩存,或者其它的存儲位置,我們都可以靈活的去通過這個類去擴展它,而並不僅僅是受限於文件系統。

 

這個類配置使用的代碼也只有一行

        Job job = Job.getInstance(conf);


        //設置自定義的OutputFormat
        job.setOutputFormatClass(MyOutputFormat.class);

我們可以看到,這里我們設置了輸出的Format。雖然我們在這個自定義的format中指定了輸出的位置為D盤的根目錄,但是輸入和輸出的兩個參數還是要傳的,也就是這兩行代碼

        //指定輸入文件的位置,這里為了靈活,接收外部參數
        FileInputFormat.setInputPaths(job, new Path("D:\\mr\\wordcount\\input"));
        //指定輸入文件的位置,這里接收啟動參數
        FileOutputFormat.setOutputPath(job, new Path("D:\\mr\\wordcount\\out1"));

或許有人會覺得,輸入需要指定可以理解,輸出為什么要指定呢?這是因為我們繼承的是FileOutputFormat,所以我們就必須要有一個輸出目錄,這個目錄也會輸出文件,但是輸出的不是數據文件,而是一個結果文件,代表着成功或者失敗,而自定義中指定的format的位置,才是真正數據輸出的位置

 

這里貼上完整的啟動類的代碼,自定義輸出format不會影響到map與reduce,所以這里就不貼

 public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //這個默認值就是local,其實可以不寫
        conf.set("mapreduce.framework.name", "local");
        //本地模式運行mr程序時,輸入輸出可以在本地,也可以在hdfs中,具體需要看如下的兩行參數
        //這個默認值 就是本地,其實可以不配
        //conf.set("fs.defaultFS","file:///");
        //conf.set("fs.defaultFS","hdfs://server1:9000/");



        Job job = Job.getInstance(conf);

        //使得hadoop可以根據類包,找到jar包在哪里
        job.setJarByClass(Driver.class);

        //設置自定義的OutputFormat
        job.setOutputFormatClass(MyOutputFormat.class);

        //指定Mapper的類
        job.setMapperClass(WordCountMapper.class);
        //指定reduce的類
        job.setReducerClass(WordCountReduce.class);

        //設置Mapper輸出的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //設置最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //指定輸入文件的位置,這里為了靈活,接收外部參數
        FileInputFormat.setInputPaths(job, new Path("D:\\mr\\wordcount\\input"));
        //指定輸入文件的位置,這里接收啟動參數
        FileOutputFormat.setOutputPath(job, new Path("D:\\mr\\wordcount\\out1"));

        //將job中的參數,提交到yarn中運行
        //job.submit();
        try {
            job.waitForCompletion(true);
            //這里的為true,會打印執行結果
        } catch (ClassNotFoundException | InterruptedException e) {
            e.printStackTrace();
        }
    }

影響到的位置也僅僅是紅色代碼區域。然后隨便寫一個wordcount的代碼,執行結果如下,我們先看FileOutputFormat.setOutputPath()中參數目錄的內容

很明顯,這是mapreduce運行完成后,代表運行結果的文件

我們再看D盤的目錄

打開可以看到輸出的最終結果

自定義輸出就完了,利用這個類的實現,我們可以自由實現存儲的位置

 

 

 

七:自定義輸入數據

待補充...

 

 

 

 

八:全局計數器

在運行mapreduce中,我們可能會遇到計數器的需求,比如說我們要知道計算了多少條數據,剔除了多少條不合法的數據。

 

public class MultiOutputs {
    //通過枚舉形式定義自定義計數器
    enum MyCounter{MALFORORMED,NORMAL}

    static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] words = value.toString().split(",");

            for (String word : words) {
                context.write(new Text(word), new LongWritable(1));
            }
            //對枚舉定義的自定義計數器加1
            context.getCounter(MyCounter.MALFORORMED).increment(1);
            //通過動態設置自定義計數器加1
            context.getCounter("counterGroupa", "countera").increment(1);
//直接設定數值
            context.getCounter("","").setValue(10);
        }

    }

 

 

 

 

 

九:多個job串聯,定義執行順序

還記得之前我們寫的mr程序中有計算qq好友,以及計算一本小說中,出現的哪個詞最多的程序嗎?我們分別是使用了兩個mapreduce來計算這些數據,第二個mapreduce是基於第一個mapreduce的。

但是那個時候,我們是等待第一個程序執行完成后,手動執行第二個程序,其實這一步操作是可以自動的。我們可以把多個job關聯起來

 

    Job job1 = 創建第一個job;
    Job job2 = 創建第二個job;
    Job job3 = 創建第三個job;
      ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
        ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
        ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
       
        cJob1.setJob(job1);
        cJob2.setJob(job2);
        cJob3.setJob(job3);

        // 設置作業依賴關系
        cJob2.addDependingJob(cJob1);//第二個依賴於第一個
        cJob3.addDependingJob(cJob2);//第三個依賴於第二個
 
        JobControl jobControl = new JobControl("RecommendationJob");
        jobControl.addJob(cJob1);
        jobControl.addJob(cJob2);
        jobControl.addJob(cJob3);
 
 
        // 新建一個線程來運行已加入JobControl中的作業,開始進程並等待結束
        Thread jobControlThread = new Thread(jobControl);
        jobControlThread.start();
        while (!jobControl.allFinished()) {
            Thread.sleep(500);
        }
        jobControl.stop();

 

 

 

 

 

十:mapreduce的參數優化

10.1:資源相關參數

//以下參數是在用戶自己的mr應用程序中配置就可以生效
(1) mapreduce.map.memory.mb: 一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
(2) mapreduce.reduce.memory.mb: 一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
(3) mapreduce.map.java.opts: Map Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “”
(4) mapreduce.reduce.java.opts: Reduce Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默認值: “”
(5) mapreduce.map.cpu.vcores: 每個Map task可使用的最多cpu core數目, 默認值: 1
(6) mapreduce.reduce.cpu.vcores: 每個Reduce task可使用的最多cpu core數目, 默認值: 1

//應該在yarn啟動之前就配置在服務器的配置文件中才能生效
(7) yarn.scheduler.minimum-allocation-mb      1024   給應用程序container分配的最小內存
(8) yarn.scheduler.maximum-allocation-mb      8192    給應用程序container分配的最大內存
(9) yarn.scheduler.minimum-allocation-vcores    1    
(10)yarn.scheduler.maximum-allocation-vcores    32
(11)yarn.nodemanager.resource.memory-mb   8192  

//shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好
(12) mapreduce.task.io.sort.mb   100         //shuffle的環形緩沖區大小,默認100m
(13) mapreduce.map.sort.spill.percent   0.8    //環形緩沖區溢出的閾值,默認80%

 

 

 

10.2:容錯相關參數

(1) mapreduce.map.maxattempts: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
(2) mapreduce.reduce.maxattempts: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
(3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大於0的值,比如5,表示如果有低於5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。
(4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0.
(5) mapreduce.task.timeout: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處於block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

 

 

 

10.3:本地運行mapreduce作業

mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local

 

 

 

10.4:效率和穩定性相關參數

(1) mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false
(2) mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false
(3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。
(4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小,
(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片時的最大切片大小(切片的默認大小就等於blocksize,即 134217728)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM