HDFS操作實驗


下一篇Hbase實驗

HDFS實驗

學習了中國MOOC上的《大數據技術原理與應用》,然后找到實驗開始學習。我認為這門課被認為是入門的專業課是可以的,很多地方都只是一個簡單的描述一下體系結構,工作方式等等,這就足夠了,多了也聽不懂。學習完了這門課,就是深似海的感覺,對講的內容總是一知半解,可能自己理論確實不太行趴++
但是,廈門大學(非本校)的這個數據庫實驗網站還是非常不錯的,開源網站吹爆,貼出鏈接,大家一起學習趴

實驗部分

1~11題既需要使用shell語言,又需要使用java語言。我在寫的過程,可能前面的語法有一些繁瑣,畢竟是在學習過程中,希望大家能加強趴++

1.向HDFS中上傳任意文本文件,如果指定的文件在HDFS中已經存在,由用戶指定是追加到原有文件末尾還是覆蓋原有的文件;

shell:

#!/bin/bash
hdfs dfs -test -e $1
if [ $? -eq 0 ] ;then
if [ $2 -eq 0 ] ;then # 表示追加到原有文件
hdfs dfs -get $1 temp.txt
cat $1 temp.txt >> temp.txt.template
hdfs dfs -rm $1
hdfs dfs -put temp.txt.template $1
rm temp.txt.template
rm temp.txt
else
hdfs dfs -rm $1
hdfs dfs -put ./$1 $1
fi
else
hdfs dfs -put ./$1 $1
fi
簡單實例

java:

追加文件不需要再create之后再使用append,寫文件是create,讀文件是open,追加文件內容是append

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class Prac1 {

	public static void main(String[] args) {
		Scanner input = new Scanner(System.in);
		System.out.println("input filename and if it exists in hdfs,Cover or Append");
		String filename = input.next();
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
		conf.setBoolean("dfs.support.append", true);
		conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
		conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enabled", true);

		try {
			FileSystem fs = FileSystem.get(conf);
			FSDataOutputStream fos;
			FileInputStream is = new FileInputStream(
					"/home/hadoop/Desktop/HPractice/HDFSPractice/"+filename);
			Path path = new Path(filename);
			
			String cmd = input.next();
			
			if((fs.exists(path))&&(cmd.equals("Append"))) 
				fos = fs.append(path);
			else
				fos = fs.create(path);
			
			if(fos!=null) 
			{
				byte[] bytes = new byte[1024];
				int len;
				while((len=is.read(bytes))!=-1) 
					fos.write(bytes,0,len);
				System.out.println("上傳成功");
			}
			else {
				System.out.println("Error");
			}
			fs.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

2.從HDFS中下載指定文件,如果本地文件與要下載的文件名稱相同,則自動對下載的文件重命名

shell:

#~/bin/bash
test -e $1
if [ $? -eq 0 ]
then
        hdfs dfs -get $1 ${1}.template
else
        hdfs dfs -get $1 $1
fi


java:

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class Prac2 {

	public static void main(String[] args) {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem" );
		
		try {
			FileSystem fs = FileSystem.get(conf);
			FSDataInputStream fis;
			FileOutputStream os ;
			System.out.println("請輸入下載的文件名:");
			Scanner input = new Scanner(System.in);
			String filename = input.next();
			fis = fs.open(new Path(filename));
			
			File localFile = new File("/home/hadoop/Desktop/HPractice/HDFSPractice/"+filename);
			if(localFile.exists()) 
				os = new FileOutputStream(localFile+".template");
			else 
				os = new FileOutputStream(localFile);
			
			byte[] bytes = new byte[1024];
			int len;
			while((len=fis.read(bytes))!=-1) 
				os.write(bytes,0,len);
			
			System.out.println("下載成功");
			fis.close();
			fs.close();
			os.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}

}

3.將HDFS中指定文件的內容輸出到終端中;

shell:

hdfs dfs -cat data.txt


java:

import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class Prac3 {

	public static void main(String[] args) {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
		
		try {
			FileSystem fs = FileSystem.get(conf);
			Scanner input = new Scanner(System.in);
			System.out.print("請輸入HDFS中的文件:");
			String filename = input.next();
			FSDataInputStream fis;
			if(!fs.exists(new Path(filename))) {
				System.out.println("不存在該文件");
				System.exit(0);
			}
			fis = fs.open(new Path(filename));
			
			byte[] bytes = new byte[1024];
			int len;
			while((len=fis.read(bytes))!=-1) 
				System.out.print(new String(bytes));
			System.out.println("\n結束傳輸");//刷新緩沖區
			fis.close();
			fs.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}

}

總體代碼:

接下就寫switch語句里面的函數了,不再重復寫贅余的部分

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

public class Prac4_last {
	static Configuration conf = new Configuration();
	static FileSystem fs;
	static FSDataInputStream fis;
	static FSDataOutputStream fos;
	static FileInputStream is;
	static FileOutputStream os;
	static Scanner input;
	static Path filename;
	
	public static void main(String[] args) {
		try {
			conf.set("fs.defaultFS", "hdfs://localhost:9000");
			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
			fs = FileSystem.get(conf);
			int cmd=4;
			switch(cmd)
			{
			//。。。
			}
			fs.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

4.顯示HDFS中指定的文件的讀寫權限、大小、創建時間、路徑等信息

shell:

hdfs dfs -ls -h /user/hadoop


java:

	private static void Prac4() throws IOException
	{
		System.out.print("請輸入HDFS文件名:");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		FileStatus[] fileStatus = null;
		if(fs.exists(filename)) 
			fileStatus = fs.listStatus(filename);
		else {
			System.out.println("HDFS不存在該文件");
			System.exit(1);
		}
		FileStatus status = fileStatus[0];
		System.out.println(status.getPermission()+"\t"
		+status.getBlockSize()+"\t"
		+status.getAccessTime()+"\t"
		+status.getPath()+"\t");
	}

5.給定HDFS中某一個目錄,輸出該目錄下的所有文件的讀寫權限、大小、創建時間、路徑等信息,如果該文件是目錄,則遞歸輸出該目錄下所有文件相關信息

shell:

hdfs dfs -ls -R /user/hadoop


java:

private static void Prac5() throws IOException
	{
		System.out.print("請輸入HDFS目錄名:");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		
		if((!fs.isDirectory(filename))&&(!fs.exists(filename))) {
			System.out.println("錯誤,請檢查輸入的是否是目錄名或者是存在的目錄名");
			System.exit(1);
		}
		RemoteIterator<LocatedFileStatus> fileStatus = fs.listFiles(filename,true);
		//listFiles將文件找到,如果是目錄也會去找里面的文件
		while(fileStatus.hasNext()) {
			FileStatus status = fileStatus.next();
			System.out.println(status.getPermission()+"\t"
					+status.getBlockSize()+"\t"
					+status.getAccessTime()+"\t"
					+status.getPath()+"\t");
		}
	}

6.提供一個HDFS內的文件的路徑,對該文件進行創建和刪除操作。如果文件所在目錄不存在,則自動創建目錄

shell:

#!/bin/bash
# $1是目錄 $2是文件
hdfs dfs -test -e $1
if [ $? -eq 0 ];then
        echo "Directory exists"
else
        hdfs dfs -mkdir $1
        echo "Create the directory"
fi

path=$1$2
hdfs dfs -test -e $path
if [ $? -eq 0 ];then
        echo -n "File exists,delete or not (y):"
        read ans
        if [ "$ans" = "y" ];then
                hdfs dfs -rm $path
        fi
else
        echo -n "File doesn't exist,create or not(y):"
        read ans
        if [ "$ans" = "y" ];then
                touch $2
                hdfs dfs -put $2 $path
                rm $2
        fi
fi

private static void Prac6() throws IOException
	{
		System.out.println("請輸入文件完整路徑和是否願意保留該文件?");
		Scanner input = new Scanner(System.in);
		String file = input.next();
		String[] p = file.split("/");
		String cmd = input.next();
		
		String[] temp_dir = new String[p.length-1];
		for(int i=0;i<p.length;i++) 
			if(i<(p.length-1))
				temp_dir[i]=p[i];
		
		filename = new Path(file);
		Path dir = new Path(StringUtils.join(temp_dir,"/"));
		
		if(!fs.exists(dir)) {
			fs.mkdirs(dir);
			System.out.println("成功創建目錄");
		}
		
		if(fs.exists(filename)) {
			if(!cmd.equals("y")) {
				fs.delete(filename,true);
				System.out.println("成功刪除該文件");}}
		else {
			if(cmd.equals("y")) {
				fs.create(filename);
				System.out.println("成功創建該文件");}}
		
		System.out.println("程序完成");
	}

7.提供一個HDFS的目錄的路徑,對該目錄進行創建和刪除操作。創建目錄時,如果目錄文件所在目錄不存在則自動創建相應目錄;刪除目錄時,由用戶指定當該目錄不為空時是否還刪除該目錄

shell:

#!/bin/bash
  
# $1表示目錄  $2表示操作

if [ "$2" = "create" ];then
        hdfs dfs -test -e $1
        if [ $? -eq 0 ];then
                echo "Directory exists"
        else
                echo "Directory doesn't exists"
                hdfs dfs -mkdir $1
                echo "Create the "$1" directory"
        fi
elif [ "$2"="delete" ];then
        if [ `hdfs dfs -ls $1 | wc -l` -gt 0 ];then
                echo "Not empty directory"
                echo "Input option:"
                read name
                case $name in
                        d) hdfs dfs -rm -r $1;;
                        n) echo "Cancel option";;
                        *) echo "Error input";;
                esac
        else
                echo "Empty directory"
                hdfs dfs -rm -R $1
        fi
else
        echo "Error choice"
fi


java:

private static void Prac7() throws IOException
	{
		System.out.print("請輸入目錄名稱和操作(Create or Delete): ");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		String cmd = input.next();
		
		if(cmd.equals("Create")) 
		{
			if(!fs.exists(filename)) 
			{
				fs.mkdirs(filename);
				System.out.println("成功創建目錄");
			}
		}else if(cmd.equals("Delete")) 
		{
			if(fs.exists(filename)) 
			{
				if(fs.listFiles(filename, true).hasNext()) 
				{
					System.out.print("要刪除的目錄非空,是否強制刪除(y):");
					input = new Scanner(System.in);
					cmd = input.next();
					if(cmd.equals("y"))
						if(fs.delete(filename,true))
							System.out.println("成功刪除目錄");
				}
				else
					if(fs.delete(filename,true))
						System.out.println("成功刪除目錄");
			}else
				System.out.println("不存在該目錄");
		}else
			System.out.println("錯誤命令");
	}

8.向HDFS中指定的文件追加內容,由用戶指定內容追加到原有文件的開頭或結尾

shell:

!/bin/bash
  
# $1表示指定HDFS中文件

hdfs dfs -test -e $1
if [ $? -eq 0 ];then
        echo "Input appended content"
        cat > temp.txt
        hdfs dfs -get $1 $1
        echo -n "Head or Tail:"
        read cmd
        if [ "$cmd" = "Head" ];then
                cat temp.txt $1 > ${1}.template
                hdfs dfs -rm $1
                hdfs dfs -put ${1}.template $1
        elif [ "$cmd" = "Tail" ];then
                cat $1 temp.txt > ${1}.template
                hdfs dfs -rm $1
                hdfs dfs -put ${1}.template $1
        else
                echo "Wrong Commands"
        fi
        rm ${1}.template temp.txt $1
else
        echo "File doesn't exist"
fi


java:
當重復運行程序,參數為tail時,可能會報錯org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to APPEND_FILE /user/hadoop/files/data.txt for DFSClient_NONMAPREDUCE_262443574_1 on 127.0.0.1 because lease recovery is in progress. Try again later.
稍等一會兒運行就可以了。

private static void Prac8() throws IOException
	{
		System.out.print("請輸入指定文件和追加的方式(Head or Tail): ");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		String cmd = input.next();
		System.out.println("請輸入追加的內容:");
		input = new Scanner(System.in);
		String cont="";
		String temp;
		while(input.hasNextLine())
		{
			temp = input.nextLine();
			if(temp.equals("q"))
				break;
			cont=cont+temp+"\n";
		}
		
		if(cmd.equals("Tail"))
		{
			conf.setBoolean("dfs.support.append", true);
			//補充的配置內容
			conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
			conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enabled", true);
			FSDataOutputStream fos = fs.append(filename);
			byte[] bytes = cont.getBytes();
			fos.write(bytes,0,bytes.length);
			fos.close();
		}
		else if(cmd.equals("Head"))
		{
			if(!fs.exists(filename)) {
				System.out.println("文件不存在");
				return;
			}
			Path localPath = new Path("/home/hadoop/temp");
			fs.moveToLocalFile(filename, localPath);
			FileInputStream is = new FileInputStream("/home/hadoop/temp");
			FSDataOutputStream fos = fs.create(filename);
			fos.write(cont.getBytes());
			byte[] b = new byte[1024];
			int len;
			while((len=is.read(b))!=-1)
				fos.write(b,0,len);
			is.close();
			File f = new File("/home/hadoop/temp");
			f.delete();
			fos.close();
		}else
			System.out.println("錯誤命令");
		System.out.println("程序結束運行");
	}

9.刪除HDFS中指定的文件

shell:

hdfs dfs -rm text.txt


java:

private static void Prac9() throws IOException
	{
		System.out.print("請輸入文件名:");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		if(!fs.exists(filename)) {
			System.out.println("文件不存在");
			return;
		}
		if(fs.delete(filename,true))
			System.out.println("成功刪除文件");
	}

10.刪除HDFS中指定的目錄,由用戶指定目錄中如果存在文件時是否刪除目錄

shell:

hdfs dfs -rmdir emptyDirectory # 刪除空目錄,非空目錄無法刪除
hdfs dfs -rm -r files # 刪除目錄,空、非空目錄都可以刪除


java:

private static void Prac10() throws IOException
	{
		System.out.print("請輸入目錄名和是否強制刪除(y): ");
		input = new Scanner(System.in);
		filename = new Path(input.next());
		String cmd = input.next();
		
		if(!fs.exists(filename)) {
			System.out.println("該目錄不存在");
			return;
		}
		if(fs.listFiles(filename, true).hasNext())
		{
			System.out.println("目錄中存在文件");
			if(cmd.equals("y")) 
			{
				if(fs.delete(filename,true))
					System.out.println("成功刪除目錄");
				else
					System.out.println("刪除失敗");
			}else
				System.out.println("選擇保留目錄");
		}else
		{
			if(fs.delete(filename,true))
				System.out.println("成功刪除目錄");
			else
				System.out.println("刪除失敗");
		}
	}

11.在HDFS中,將文件從源路徑移動到目的路徑

shell:

hdfs dfs -mv data.txt /data.txt


java:

private static void Prac11() throws IOException
	{
		System.out.print("請輸入源路徑和目的路徑:");
		input = new Scanner(System.in);
		Path src = new Path(input.next());
		Path tar = new Path(input.next());
		if(!fs.exists(src))
		{
			System.out.println("源文件不存在");
			return ;
		}
		if(fs.rename(src, tar))
			System.out.println("移動成功");
		else
			System.out.println("移動失敗");
	}

12.編程實現一個類“MyFSDataInputStream”,該類繼承“org.apache.hadoop.fs.FSDataInputStream”,要求如下:實現按行讀取HDFS中指定文件的方法“readLine()”,如果讀到文件末尾,則返回空,否則返回文件一行的文本。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;

public class MyFSDataInputStream extends FSDataInputStream{

	private static MyFSDataInputStream my;
	private static InputStream inputStream;
	
	public MyFSDataInputStream(InputStream in) {
		super(in);
		inputStream = in;
	}
	
	 public static MyFSDataInputStream getInstance(InputStream inputStream){
	        if (null == my){
	            synchronized (MyFSDataInputStream.class){
	                if (null == my){
	                    my = new MyFSDataInputStream(inputStream);
	                }
	            }
	        }
	        return my;
	    }

	public static String readline(FileSystem fileStatus)
	{
		BufferedReader bfr = new BufferedReader(new InputStreamReader(inputStream));
		String line = null;
		try {
			if((line=bfr.readLine())!=null)
			{
				bfr.close();
				inputStream.close();
				return line;
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return null;
		
	}
}

13.查看Java幫助手冊或其它資料,用“java.net.URL”和“org.apache.hadoop.fs.FsURLStreamHandlerFactory”編程完成輸出HDFS中指定文件的文本到終端中

這個題目的重點似乎在於怎么用URL鏈接到HDFS,這個我是借鑒了網上的,感覺就是這么回事兒(很有可能是自己才疏學淺🐕
這里需要輸入完整的路徑,不然會報錯。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;

public class last {

	private static Path filename;
	private static FileSystem fs;
	
	
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
		fs = FileSystem.get(conf);
		System.out.print("輸入文件名稱: ");
		Scanner input = new Scanner(System.in);
		filename = new Path(input.next());
		if(!fs.exists(filename)) {
			System.out.println("文件不存在");
			System.exit(1);
		}
		show();
	}
	
	public static void show()
	{
		try {
			URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
			InputStream is = new URL("hdfs","localhost",9000,filename.toString()).openStream();
			BufferedReader bfr = new BufferedReader(new InputStreamReader(is));
			String line = null;
			while((line = bfr.readLine())!=null)
				System.out.println(line);
		}catch(IOException e) {
			e.printStackTrace();
		}
	}

}

人生此處,絕對樂觀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM