Hadoop 系列(三)Java API


Hadoop 系列(三)Java API

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.9.2</version>
</dependency>

一、HDFS 操作

@Test
public void upload() throws Exception {

    Configuration conf = new Configuration();  // (1) 
    //conf.set("fs.defaultFS", "hdfs://master:9000/");

    Path dst = new Path("hdfs://master:9000/upload/MPSetup4.log");
    FileSystem fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop"); // (2)
    FSDataOutputStream os = fs.create(dst);
    FileInputStream is = new FileInputStream("c:/MPSetup.log");

    IOUtils.copy(is, os);
}
  1. Configuration 配置文件默認讀取 resources 目錄下的 core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml 文件。可以將 Hadoop 安裝目錄下的這些配制文件直接拷貝過來,也可以直接 conf.set() 設置參數。

  2. FileSystem.get() 必須要以 hadoop 的身份運行,否則會出現權限不足的問題。可以配置 -DHADOOP_USER_NAME=hadoop 參數。

下面提供一個 HdfsUtil 工具類:

public class HdfsUtil {
    FileSystem fs = null;

    @Before
    public void init() throws Exception{
        System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
        //1. 讀取classpath下的xxx-site.xml 配置文件,並解析其內容,封裝到conf對象中
        Configuration conf = new Configuration();

        //2. 也可以在代碼中對conf中的配置信息進行手動設置,會覆蓋掉配置文件中的讀取的值
        conf.set("fs.defaultFS", "hdfs://master:9000/");

        //3. 根據配置信息,去獲取一個具體文件系統的客戶端操作實例對象
        fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop");
    }

    /** 上傳文件,封裝好的寫法 */
    @Test
    public void upload2() throws Exception, IOException{
        fs.copyFromLocalFile(new Path("c:/MPSetup.log"),
                new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log"));
    }


    /** 下載文件 */
    @Test
    public void download() throws Exception {
        fs.copyToLocalFile(new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log"),
                new Path("d:/MPSetup2.txt"));

    }

    /** 查看文件信息 */
    @Test
    public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

        // listFiles列出的是文件信息,而且提供遞歸遍歷
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true);

        while(files.hasNext()) {
            LocatedFileStatus file = files.next();
            Path filePath = file.getPath();
            String fileName = filePath.getName();
            System.out.println(fileName);
        }

        System.out.println("---------------------------------");

        //listStatus 可以列出文件和文件夾的信息,但是不提供自帶的遞歸遍歷
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for(FileStatus status: listStatus){
            String name = status.getPath().getName();
            System.out.println(name + (status.isDirectory()?" is dir":" is file"));
        }
    }

    /** 創建文件夾 */
    @Test
    public void mkdir() throws IllegalArgumentException, Exception {
        fs.mkdirs(new Path("/aaa/bbb/ccc"));
    }

    /** 刪除文件或文件夾 */
    @Test
    public void rm() throws IllegalArgumentException, IOException {
        fs.delete(new Path("/aa"), true);
    }
}

二、RPC 調用

(1) LoginServiceInterface 接口

package com.github.binarylei.hadoop.rpc;

public interface LoginServiceInterface {
    
    public static final long versionID = 1L;
    public String login(String username, String password);

}

public class LoginServiceImpl implements LoginServiceInterface {

    @Override
    public String login(String username, String password) {       
        return username + " login in successfully!";
    }
}

(2) RPCServer

// 目前只能上傳到 Linux 上運行 ??????
public class RPCServer {

    private static String host = "master";
    private static int port = 10001;

    public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://master:9000/");
        Builder builder = new Builder(conf);
        
        builder.setBindAddress("master")
                .setPort(port)
                .setProtocol(LoginServiceInterface.class)
                .setInstance(new LoginServiceImpl());
        
        Server server = builder.build();
        
        server.start();
    }
}
  1. 將打包后的 hadoop-api-1.0.0.jar 上傳到 Linux,啟動 RPC 服務,執行

    hadoop jar hadoop-api-1.0.0.jar com.github.binarylei.hadoop.rpc.RPCServer

    2018-05-13 18:20:16,606 INFO ipc.CallQueueManager: Using callQueue: class java.util.concurrent.LinkedBlockingQueue queueCapacity: 100 scheduler: class org.apache.hadoop.ipc.DefaultRpcScheduler
    2018-05-13 18:20:17,631 INFO ipc.Server: Starting Socket Reader #1 for port 10001
    2018-05-13 18:20:19,613 INFO ipc.Server: IPC Server Responder: starting
    2018-05-13 18:20:19,618 INFO ipc.Server: IPC Server listener on 10001: starting

(3) RPCClient

public class RPCClient {

    private static String host = "master";
    private static int port = 10001;

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://master:9000/");

        LoginServiceInterface proxy = RPC.getProxy(
                LoginServiceInterface.class,
                1L,
                new InetSocketAddress(host, port),
                conf);
        
        String result = proxy.login("hadoop-test", "test");
        
        System.out.println(result);
    }
}
  1. 直接在 Windows 上運行,結果如下:

    hadoop-test login in successfully!

三、MapReduce

下面模仿 wordcount,寫一個 MapReduce

(1) WCMapper

//4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型
//map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的
//默認情況下,框架傳遞給我們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容作為value
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    //mapreduce框架每讀一行數據就調用一次該方法
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //具體業務邏輯就寫在這個方法體中,而且我們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value
        //key 是這一行數據的起始偏移量     value 是這一行的文本內容

        //將這一行的內容轉換成string類型
        String line = value.toString();

        //對這一行的文本按特定分隔符切分
        String[] words = StringUtils.split(line, " ");

        //遍歷這個單詞數組輸出為kv形式  k:單詞   v : 1
        for(String word : words){
            context.write(new Text(word), new LongWritable(1));
        }
    }
}

(2) WCReducer

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    //框架在map處理完成之后,將所有kv對緩存起來,進行分組,然后傳遞一個組<key,valus{}>,調用一次reduce方法
    //<hello,{1,1,1,1,1,1.....}>
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
            throws IOException, InterruptedException {

        long count = 0;
        //遍歷value的list,進行累加求和
        for(LongWritable value:values){
            count += value.get();
        }

        //輸出這一個單詞的統計結果

        context.write(key, new LongWritable(count));
    }
}

(3) WCReducer

/**
 * 用來描述一個特定的作業
 * 比如,該作業使用哪個類作為邏輯處理中的map,哪個作為reduce
 * 還可以指定該作業要處理的數據所在的路徑
 * 還可以指定改作業輸出的結果放到哪個路徑
 * ....
 * @author duanhaitao@itcast.cn
 */
public class WCRunner {

    public static void main(String[] args) throws Exception {

        //System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);

        //設置整個job所用的那些類在哪個jar包
        wcjob.setJarByClass(WCRunner.class);

        //本job使用的mapper和reducer的類
        wcjob.setMapperClass(WCMapper.class);
        wcjob.setReducerClass(WCReducer.class);

        //指定reduce的輸出數據kv類型
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(LongWritable.class);

        //指定mapper的輸出數據kv類型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(LongWritable.class);

        //指定要處理的輸入數據存放路徑
        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://master:9000/wc/input/"));

        //指定處理結果的輸出數據存放路徑
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/wc/output5/"));

        //將job提交給集群運行
        wcjob.waitForCompletion(true);
    }
}

四、Hadoop 運行(Windows)

問題 1:缺少 winutils.exe 和 hadoop.dll

# 缺少 winutils.exe
Could not locate executable null \bin\winutils.exe in the hadoop binaries
# 缺少 hadoop.dll
Unable to load native-hadoop library for your platform… using builtin-Java classes where applicable

解決辦法:

  1. 下載地址:https://github.com/srccodes/hadoop-common-2.2.0-bin
  2. 解壓后將 hadoop-common-2.2.0-bin/bin 目錄下的文件全部拷貝到 HADOOP_HOME/bin 目錄下,並配置 HADOOP_HOME 環境變量。
  3. 將 hadoop-common-2.2.0-bin/bin/hadoop.dll 拷貝到 C:\Windows\System32 目錄下。

問題 2:Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

解決辦法:

  1. 首先確保 C:\Windows\System32 目錄下已經有 hadoop.dll 文件

  2. 在自己的工程中拷貝一份 org.apache.hadoop.io.nativeio.NativeIO 類,修改如下:

    public static boolean access(String path, AccessRight desiredAccess)
                    throws IOException {
        return true;
        //return access0(path, desiredAccess.accessRight());
    }
    

參考:

  1. 《Hadoop 運行問題》:https://blog.csdn.net/congcong68/article/details/42043093
  2. 《winutils.exe 下載地址》:https://github.com/srccodes/hadoop-common-2.2.0-bin

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM