HDFS簡單測試


使用Hadoop的Java客戶端API操作分布式文件系統
#獲取文件系統實現
//hdfs://master01:9000/
FileSystem get(URI uri[,Configuration conf[,String user]])
//fs.defaultFS
FileSystem newInstance(URI uri[,Configuration conf[,String user]])
#從配置中獲取默認URI路徑
URI getDefaultUri(Configuration conf)
#打開一個指向給定HDFS文件的輸入流
FSDataInputStream open(Path path[,int bufferSize])
#移動(不同的目錄參數時)或重命名(相同的目錄參數時)一個HDFS文件,它相當於mv命令
boolean rename(Path src,Path dest)
#創建符號鏈接,第一個參數是被鏈接的目標文件,第二個參數是符號連接,第三個參數表示符號連接是否允許創建父級目錄,
//此方法調用之前必須首先調用FileSystem.enableSymlinks()以保證符號連接可用
createSymlink(Path src, Path link, boolean createParent)
#本地文件的內容直接追加到HDFS文件中
FSDataOutputStream append(Path path[,int bufferSize])
#創建一個指向給定HDFS文件的輸出流,使用該輸出流可以向該文件中寫入內容
FSDataOutputStream create(Path path[,boolean override[,int bufferSize]])
#刪除指定的HDFS文件或文件夾,第二個參數表示是否可以遞歸刪除,方法返回是否刪除成功的邏輯類型
boolean delete(Path path[,boolean recursive])
#判斷給定的HDFS文件或路徑是否存在
boolean exists(Path path)
#獲取給定HDFS文件塊的實際塊尺寸(字節)
long getBlockSize(Path path)
#獲取HDFS文件塊的默認尺寸(字節)
long getDefaultBlockSize()
#獲取文件系統的默認端口
int getDefaultPort()
#獲取給定HDFS文件的尺寸
long getLength(Path path)
#獲取給定HDFS文件的文件狀態信息
FileStatus getFileStatus(Path path)
#獲取HDFS文件的實際副本數量
int getReplication()
#設置給定文件的副本數量
boolean setReplication(Path src,short replication)
#判斷給定的文件是否是文件/目錄
boolean isFile(Path path)/isDirectory(Path path)
#獲取HDFS文件的默認副本數量
int getDefaultReplication()
#創建指定路徑的HDFS目錄
boolean mkdirs(Path path[,FsPermission permission])
#獲取文件系統的使用量和容量的狀態
FileStatus getStatus()
#列出給定目錄中的文件和文件夾
FileStatus[] listStatus(Path path[,PathFilter pf])
#第二個參數表示是否遞歸,如果第一個參數是文件則返回文件的狀態信息,如果第一個參數是目錄則表示是否遞歸列出該目錄中的文件(不含文件夾)
RemoteIterator<LocatedFileStatus> listFiles(Path path,boolean recursive)
#獲取給定HDFS文件塊所在的位置
BlockLocation[] getFileBlockLocations(Path path,int start,long len)
BlockLocation[] getFileBlockLocations(FileStatus file,int start,long len)
#返回文件系統中所有文件的總尺寸
long getUsed()
#將本地單個文件拷貝到HDFS文件系統中,前兩個參數表示是否需要刪除源文件,是否需要覆蓋目標文件
copyFromLocalFile([boolean delSrc[,boolean override]]Path src,Path dst)
#重載,將本地多個文件拷貝到HDFS文件系統中
copyFromLocalFile(boolean delSrc,boolean override,Path[] srcs,Path dst)
#將HDFS文件拷貝到本地文件系統中,第1個參數表示是否需要刪除源文件
copyToLocalFile([boolean delSrc,]Path src,Path dst)
#將HDFS文件或文件夾從一個路徑拷貝到另一個路徑
FileUtil.copy(FileSystem srcFS,Path srcPath,FileSystem dstFS,Path dstPath,boolean delSource,boolean override,Configuration conf)

 

測試代碼

測試說明:

全都切換到hadoop用戶下;在主機上啟動hdfs集群;使用jps查看啟動是否成功;

主機:master01

節點:slave01、slave02、slave03

本地機器的eclipse上的代碼:(導入hadoop-2.7.3-All.jar包

package com.mmzs.hdfs.main;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/**
 * @author hadoop
 *
 */
public class HdfsMain {

    private static FileSystem fs;
    private static Configuration conf;
    static {
        String uri = "hdfs://master01:9000/";
        conf = new Configuration();
        try {
            fs = FileSystem.get(new URI(uri), conf, "hadoop");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
    
    
    public static void main(String[] args) {
//        mkdir("/data");
//        uploadFile("/home/hadoop/test/1.txt");//本地文件路徑
//        downloadFile("/data/1.txt");//HDFS上的文件路徑
//        openFile("/data/2.txt");
        try {
//            writeFile("/data/1.txt");
//            rename("/data/1.txt", "/data02/1-run.txt");
//            rename("/data02/1-run.txt", "/data02/1copy.txt");
            createLink("/data/1.txt", "/data/info");
//            deleteFile("/data/2.txt");
//            listFiles("/data");
//            listFiles02("/data");
//            copyFile("/data/1.txt", "/data/test/");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 實現集群文件的拷貝
     * @param src
     * @param path
     * @throws IOException 
     */
    public static void copyFile(String src, String path) throws IOException {
        Path srcpath = new Path(src);
        Path dstpath = new Path(path);
        
        Boolean flag = FileUtil.copy(fs, srcpath, fs, dstpath, false, false, conf);
        if (flag) {
            System.out.println("拷貝文件成功");
        } else {
            System.out.println("拷貝文件失敗");
        }
    }
    
    /**
     * 創建文件夾
     * @param path
     */
    public static void mkdir(String path) {
        Path newpath = new Path(path);
        try {
            Boolean flag = fs.mkdirs(newpath);
            if (flag) {
                System.out.println("文件夾創建成功");
            } else {
                System.out.println("文件夾創建失敗");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 上傳本地文件到HDFS集群
     * @param path
     */
    public static void uploadFile(String path) {
        Path localFile = new Path(path);
        Path clusterPath = new Path("/data/");
        
        try {
            fs.copyFromLocalFile(false, localFile, clusterPath);
            System.out.println(" 上傳成功!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 下載HDFS集群中的文件到本地系統中
     * @param clusterPath
     */
    public static void downloadFile(String clusterPath) {
        Path clusterFile = new Path(clusterPath);
        Path localPath = new Path("/home/hadoop/test/");
        
        try {
            fs.copyToLocalFile(false, clusterFile, localPath);
            System.out.println("下載成功!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 打開集群文件並輸出文件內容
     * @param clusterFile
     */
    public static void openFile(String clusterFile) {
        Path clusterPath = new Path(clusterFile);
        FSDataInputStream fis = null;
        
        try {
            fis = fs.open(clusterPath,1024);
            while (fis.available()>0) {//available表示測試是否可以讀
//                String line = fis.readUTF();
                String line = fis.readLine();
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
                try {
                    if(null != fis)
                    fis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }
    
    /**
     * 寫一個文件到HDFS集群
     * @param clusterFile
     * @throws IOException
     */
    public static void writeFile(String clusterFile) throws IOException {
        Path destFile = new Path(clusterFile);
        FSDataOutputStream fos = null;
        if (fs.exists(destFile) && fs.isDirectory(destFile)) {
            System.out.println("參數路徑指向一個目錄,無法寫入數據");
            return;
        }
        //如果文件存在則以追加的方式打開,否則直接創建文件並寫入內容
        if (fs.exists(destFile) && fs.isFile(destFile)) {
            fos = fs.append(destFile);
        }else {
            fos = fs.create(destFile, true);
        }
        try{
            fos.writeUTF("he is ligang\n");
            fos.writeUTF("he is 65kg\n");
            fos.writeUTF("he is eat\n");
            fos.writeUTF("he is running\n");
            fos.flush();//一定要flush啊!!!!!!!!
            System.out.println("寫入完成!");
        }finally{
            if(null != fs) fs.close();
        }
    }
    
    /**
     * 移動(在不同目錄下)或重命名文件(同一個目錄下);
     * @param path1 源文件路徑
     * @param path2 移動或重命名后的文件路徑
     * @throws IOException
     */
    public static void rename(String path1, String path2) throws IOException {
        Path file01 = new Path(path1);
        Path file02 = new Path(path2);
        if (!fs.exists(file01)) return;
        
        int index = path2.lastIndexOf('/');
        String dir = path2.substring(0, index);
        Path dstpath = new Path(dir);
        
        Boolean flag = null;
        if (!fs.exists(dstpath)) {
            flag = fs.mkdirs(dstpath);
        }
        
//        if (flag) {
//            return;
//        }
        
        fs.rename(file01, file02);
    }
    
    /**
     * 創建集群文件快捷方式
     * @param src
     * @param link
     * @throws IOException
     */
    public static void createLink(String src, String link) throws IOException {
        Path srcPath = new Path(src);
        Path linkPath = new Path(link);
        
        //如果創建快捷方式的源文件不存在或者是一個目錄就理它
        if (!fs.exists(srcPath) || fs.isDirectory(srcPath)) return;
        fs.createSymlink(srcPath, linkPath, true);
        System.out.println("創建完成!");
    }
    
    /**
     * 刪除集群文件或者目錄
     * @param path
     * @throws IOException
     */
    public static void deleteFile(String path) throws IOException {
        Path dstPath = new Path(path);
        Boolean flag = fs.delete(dstPath, true);
        if (flag) {
            System.out.println("刪除成功");
        }else {
            System.out.println("刪除失敗");
        }
    }
    
    /**
     * 查看給定參數集群路徑下的文件或者文件夾
     * @param path
     * @throws IOException 
     */
    public static void listFiles(String path) throws IOException {
        Path dstPath = new Path(path);
        if(!fs.exists(dstPath) ||fs.isFile(dstPath)) {
            return;
        }
        FileStatus[] filestatus = fs.listStatus(dstPath);
        for(FileStatus fstatus:filestatus) {
            if(fstatus.isFile()) {
                //如果是文件打印出文件的大小尺寸(單位:byte)
                long filelen = fstatus.getLen();
                System.out.println(filelen);
                //獲取文件名
                System.out.println("文件名:"+fstatus.getPath().getName());
                //文件的路徑
                System.out.println("文件路徑:"+fstatus.getPath().toString());
                //獲取文件塊的信息
                BlockLocation[] bls = fs.getFileBlockLocations(fstatus, 0, filelen);
                //循環打印每一個塊的信息
                for(BlockLocation bl:bls) {
                    String[] hosts = bl.getHosts() ;
                    System.out.print("主機列表:"+"\n"+"hosts:");
                    for (int i = 0; i < hosts.length; i++) {
                        System.out.print(hosts[i]+"---");
                    }
                    System.out.println();
                    Long blockSize = bl.getLength();
                    System.out.println("blockSize:"+blockSize);
                    String[] blockPaths = bl.getTopologyPaths();
                    System.out.println("塊路徑列表:");
                    for (int i = 0; i < blockPaths.length; i++) {
                        System.out.println(blockPaths[i]+"---");
                    }
                }
            }else {
                //獲取文件名
                String fileName = fstatus.getPath().getName();
                //獲取目錄名
                String filePath = fstatus.getPath().toString();
                System.out.println("目錄名字:"+fileName+"目錄路徑:"+filePath);
            }
        }
    }
    
    /**
     * 遍歷指定路徑下的問夾(不含文件夾)
     * @param path
     * @throws IOException 
     */
    public static void listFiles02(String path) throws IOException {
        Path dstPath = new Path(path);
        if(!fs.exists(dstPath) ||fs.isFile(dstPath)) {
            return;
        }
        RemoteIterator<LocatedFileStatus> fss = fs.listFiles(dstPath, true);
        while ( fss.hasNext() ) {
            LocatedFileStatus fstatus = fss.next();
            if(fstatus.isFile()) {
                //如果是文件打印出文件的大小尺寸(單位:byte)
                long filelen = fstatus.getLen();
                System.out.println("文件大小尺寸 :"+filelen);
                //獲取文件名
                System.out.println("文件名:"+fstatus.getPath().getName());
                //文件的路徑
                System.out.println("文件路徑:"+fstatus.getPath().toString());
                //獲取文件塊的信息
                BlockLocation[] bls = fs.getFileBlockLocations(fstatus, 0, filelen);
                //循環打印每一個塊的信息
                for(BlockLocation bl:bls) {
                    String[] hosts = bl.getHosts() ;
                    System.out.print("主機列表:"+"\n"+"hosts:");
                    for (int i = 0; i < hosts.length; i++) {
                        System.out.print(hosts[i]+"---");
                    }
                    System.out.println();
                    Long blockSize = bl.getLength();
                    System.out.println("blockSize:"+blockSize);
                    String[] blockPaths = bl.getTopologyPaths();
                    System.out.println("塊路徑列表:");
                    for (int i = 0; i < blockPaths.length; i++) {
                        System.out.println(blockPaths[i]+"---");
                    }
                }
            }else {
                //獲取文件名
                String fileName = fstatus.getPath().getName();
                //獲取目錄名
                String filePath = fstatus.getPath().toString();
                System.out.println("目錄名字:"+fileName+"目錄路徑:"+filePath);
            }
        }
        
    }
}
HdfsMain

 然后在eclipse中直接誒運行即可。

然后使用:“hdfs dfs -ls /集群文件路徑”   等操作來查看即可。

 

運行中可以查看運行狀態:hdfs dfs admin -report

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM