【大數據】Hadoop實驗報告


鏈接地址:【大數據】Hadoop實驗報告

實驗一 熟悉常用的Linux操作和Hadoop操作

1.實驗目的

Hadoop運行在Linux系統上,因此,需要學習實踐一些常用的Linux命令。本實驗旨在熟悉常用的Linux操作和Hadoop操作,為順利開展后續其他實驗奠定基礎。

2.實驗平台

  • 操作系統:Linux;
  • Hadoop版本:2.7.1。

3.實驗內容和要求

(一)熟悉常用的Linux操作
請按要求上機實踐如下linux基本命令。

cd命令:切換目錄

(1)切換到目錄 /usr/local
在這里插入圖片描述

(2)切換到當前目錄的上一級目錄
在這里插入圖片描述

(3)切換到當前登錄Linux系統的用戶的自己的主文件夾
在這里插入圖片描述

ls命令:查看文件與目錄

(4)查看目錄/usr下所有的文件
在這里插入圖片描述

mkdir命令:新建新目錄

(5)進入“/tmp”目錄,創建一個名為“a”的目錄,並查看“/tmp”目錄下已經存在哪些目錄
在這里插入圖片描述

(6)進入“/tmp”目錄,創建目錄“a1/a2/a3/a4”
在這里插入圖片描述

rmdir命令:刪除空的目錄
(7)將上面創建的目錄a(在“/tmp”目錄下面)刪除
在這里插入圖片描述

(8)刪除上面創建的目錄“a1/a2/a3/a4” (在“/tmp”目錄下面),然后查看“/tmp”目錄下面存在哪些目錄
在這里插入圖片描述

cp命令:復制文件或目錄

(9)將當前用戶的主文件夾下的文件.bashrc復制到目錄“/usr”下,並重命名為bashrc1
在這里插入圖片描述

(10)在目錄“/tmp”下新建目錄test,再把這個目錄復制到“/usr”目錄下
在這里插入圖片描述

mv命令:移動文件與目錄,或更名

(11)將“/usr”目錄下的文件bashrc1移動到“/usr/test”目錄下
在這里插入圖片描述

(12)將“/usr”目錄下的test目錄重命名為test2
在這里插入圖片描述

rm命令:移除文件或目錄

(13)將“/usr/test2”目錄下的bashrc1文件刪除
$ sudo rm /usr/test2/bashrc1
在這里插入圖片描述

(14)將“/usr”目錄下的test2目錄刪除
$ sudo rm –r /usr/test2
在這里插入圖片描述

cat命令:查看文件內容

(15)查看當前用戶主文件夾下的.bashrc文件內容
在這里插入圖片描述

tac命令:反向查看文件內容

(16)反向查看當前用戶主文件夾下的.bashrc文件的內容
在這里插入圖片描述

more命令:一頁一頁翻動查看

(17)翻頁查看當前用戶主文件夾下的.bashrc文件的內容
在這里插入圖片描述

head命令:取出前面幾行

(18)查看當前用戶主文件夾下.bashrc文件內容前20行
在這里插入圖片描述

(19)查看當前用戶主文件夾下.bashrc文件內容,后面50行不顯示,只顯示前面幾行
在這里插入圖片描述

tail命令:取出后面幾行

(20)查看當前用戶主文件夾下.bashrc文件內容最后20行
在這里插入圖片描述

(21) 查看當前用戶主文件夾下.bashrc文件內容,並且只列出50行以后的數據
在這里插入圖片描述

touch命令:修改文件時間或創建新文件

(22)在“/tmp”目錄下創建一個空文件hello,並查看文件時間
在這里插入圖片描述

(23)修改hello文件,將文件時間整為5天前

chown命令:修改文件所有者權限

在這里插入圖片描述

(24)將hello文件所有者改為root帳號,並查看屬性
在這里插入圖片描述

find命令:文件查找

(25)找出主文件夾下文件名為.bashrc的文件
在這里插入圖片描述

tar命令:壓縮命令

(26)在根目錄“/”下新建文件夾test,然后在根目錄“/”下打包成test.tar.gz
在這里插入圖片描述

(27)把上面的test.tar.gz壓縮包,解壓縮到“/tmp”目錄
$ sudo tar -zxv -f /test.tar.gz -C /tmp
在這里插入圖片描述

grep命令:查找字符串

(28)從“~/.bashrc”文件中查找字符串'examples'
在這里插入圖片描述

(29)請在“~/.bashrc”中設置,配置Java環境變量
在這里插入圖片描述

(30)查看JAVA_HOME變量的值
在這里插入圖片描述

(二)熟悉常用的Hadoop操作
(31)使用hadoop用戶登錄Linux系統,啟動Hadoop(Hadoop的安裝目錄為“/usr/local/hadoop”),為hadoop用戶在HDFS中創建用戶目錄“/user/hadoop”
在這里插入圖片描述

(32)接着在HDFS的目錄“/user/hadoop”下,創建test文件夾,並查看文件列表
在這里插入圖片描述

(33)將Linux系統本地的“~/.bashrc”文件上傳到HDFS的test文件夾中,並查看test
在這里插入圖片描述

(34)將HDFS文件夾test復制到Linux系統本地文件系統的“/usr/local/hadoop”目錄下
在這里插入圖片描述

實驗二 熟悉常用的HDFS操作

1.實驗目的

  • 理解HDFS在Hadoop體系結構中的角色;
  • 熟練使用HDFS操作常用的Shell命令;

2.實驗平台

  • 操作系統:Linux(建議Ubuntu16.04);
  • Hadoop版本:2.7.1;
  • JDK版本:1.7或以上版本;
  • Java IDE:Eclipse。

3.實驗步驟

(一)編程實現以下功能,並利用Hadoop提供的Shell命令完成相同任務:
(1)向HDFS中上傳任意文本文件,如果指定的文件在HDFS中已經存在,則由用戶來指定是追加到原有文件末尾還是覆蓋原有的文件;

Shell命令:
追加到末尾

hadoop fs -appendToFile /usr/local/hadoop/test.txt /user/text.txt

覆蓋原文件

hadoop fs -copyFromLocal -f /usr/local/hadoop/test.txt /user/text.txt

Java代碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*; 
public class HDFSApi { 
      public static boolean test(Configuration conf, String path) throws IOException {         FileSystem fs = FileSystem.get(conf);       
          return fs.exists(new Path(path)); 
    } 
        public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path localPath = new Path(localFilePath); 
        Path remotePath = new Path(remoteFilePath);             fs.copyFromLocalFile(false, true, localPath, remotePath);      
   fs.close(); 
    } 
  public static void appendToFile(Configuration conf, String  localFilePath, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FileInputStream in = new FileInputStream(localFilePath); 
        FSDataOutputStream out = fs.append(remotePath); 
   byte[] data = new byte[1024];      
   int read = -1;         
while ( (read = in.read(data)) > 0 ) {          	out.write(data, 0, read); 
        }         
out.close();         in.close();         fs.close(); 
    } 
public static void main(String[] args) { 
 	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String localFilePath = "/home/hadoop/text.txt";   
 	 	String remoteFilePath = "/user/hadoop/text.txt";    
 	 	String choice = "append";
	 	String choice = "overwrite";    	 
 	 	try {  	 	 	 	 	
Boolean fileExists = false;  	 	 	
if (HDFSApi.test(conf, remoteFilePath)) {  	 	 	 	
fileExists = true; 
 System.out.println(remoteFilePath + " 已存在.");  	
 	 } else { 
 	 	 	 	System.out.println(remoteFilePath + " 不存在."); 
 	 	 	} 
 	 	 	if ( !fileExists) { // 文件不存在,則上傳 
 	 	 	 	HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath); 
 	 	 	 	System.out.println(localFilePath + " 已上傳至 " + remoteFilePath); 
 	 	 	} else if ( choice.equals("overwrite") ) {    // 選擇覆蓋 
 	 	 	 	HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath); 
 	 	 	 	System.out.println(localFilePath + " 已覆蓋 " + remoteFilePath); 
	 	} else if ( choice.equals("append") ) {   // 選擇追加 
 	 	 	 	HDFSApi.appendToFile(conf, localFilePath, remoteFilePath); 
 	 	 	 	System.out.println(localFilePath + " 已追加至 " + remoteFilePath); 
 	 	 	} 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	} 
 	} 
} 

(2)從HDFS中下載指定文件,如果本地文件與要下載的文件名稱相同,則自動對下載的文件重命名;

Shell命令:

if $(hadoop fs -test -e /usr/local/hadoop/test.txt);
then $(hadoop fs -copyToLocal /user/test.txt /usr/local/hadoop/test.txt); 
else $(hadoop fs -copyToLocal /user/test.txt /usr/local/hadoop/test2.txt); 

Java代碼:

Import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 
import java.io.*; 
public class HDFSApi { 
public 	static 	void 	copyToLocal(Configuration 	conf, 	String 	remoteFilePath, localFilePath) throws IOException { 
      FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        File f = new File(localFilePath); 
if(f.exists()) { 
         	System.out.println(localFilePath + " 已存在.");          	
Integer i = 0;          	
while (true) {          	 	
f = new File(localFilePath + "_" + i.toString());          	 	
if (!f.exists()) {          	 	 	
localFilePath = localFilePath + "_" + i.toString(); 
         break; 
        }
} 
System.out.println("將重新命名為: " + localFilePath); ()); 
 }        Path localPath = new Path(localFilePath);       
 fs.copyToLocalFile(remotePath, localPath);       
 fs.close(); 
    } 
 	public static void main(String[] args) { 
 	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 String localFilePath = "/home/hadoop/text.txt";    
String remoteFilePath = "/user/hadoop/text.txt";   
try { 
 	HDFSApi.copyToLocal(conf, remoteFilePath, localFilePath); 
 	System.out.println("下載完成"); 
} catch (Exception e) { 
 	e.printStackTrace(); 
}
}
}

(3)將HDFS中指定文件的內容輸出到終端中;

Shell命令:

hadoop fs -cat text.txt

Java代碼:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.*; 
 
public class HDFSApi { 
    /** 
     * 讀取文件內容 
     */ 
    public static void cat(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FSDataInputStream in = fs.open(remotePath); 
        BufferedReader d = new BufferedReader(new InputStreamReader(in));         String line = null; 
        while ( (line = d.readLine()) != null ) { 
         	System.out.println(line); 
        } 
       d.close();        in.close();        fs.close(); 
    } 
 	public static void main(String[] args) { 
 	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000");
String remoteFilePath = "/user/local/hadoop/text.txt";    // HDFS 路徑
try { 
 	System.out.println("讀取文件: " + remoteFilePath); 
 	HDFSApi.cat(conf, remoteFilePath); 
 	System.out.println("\n 讀取完成"); 
} catch (Exception e) { 
 	e.printStackTrace(); 
}
}
}

(4)顯示HDFS中指定的文件的讀寫權限、大小、創建時間、路徑等信息;

Shell命令:

hadoop fs -ls -h /user/test.txt

Java代碼:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.*; import java.text.SimpleDateFormat; 
public class HDFSApi { 
    public static void ls(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath);         FileStatus[] fileStatuses = fs.listStatus(remotePath);         for (FileStatus s : fileStatuses) { 
         	System.out.println("路徑: " + s.getPath().toString()); 
         	System.out.println("權限: " + s.getPermission().toString()); 
         	System.out.println("大小: " + s.getLen()); 
         	/* 返回的是時間戳,轉化為時間日期格式 */ 
         	Long timeStamp = s.getModificationTime(); 
         	SimpleDateFormat format =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
         	String date = format.format(timeStamp);   
   	System.outprintln("時間: " + date); }         fs.close(); 
    }
public static void main(String[] args) {  	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteFilePath = "/user/hadoop/text.txt";     
 	 	try { 
 	 	 	System.out.println("讀取文件信息: " + remoteFilePath); 
	 	 	HDFSApi.ls(conf, remoteFilePath); 
	                          System.out.println("\n 讀取完成"); 
} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
}
}
} 

(5)給定HDFS中某一個目錄,輸出該目錄下的所有文件的讀寫權限、大小、創建時間、路徑等信息,如果該文件是目錄,則遞歸輸出該目錄下所有文件相關信息;

Shell命令:

hadoop fs -ls -R -h /user

Java代碼

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.*; 
import java.text.SimpleDateFormat; 
 
public class HDFSApi { 
    /** 
     * 顯示指定文件夾下所有文件的信息(遞歸) 
     */ 
    public static void lsDir(Configuration conf, String remoteDir) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path dirPath = new Path(remoteDir); 
        /* 遞歸獲取目錄下的所有文件 */ 
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true); 
        /* 輸出每個文件的信息 */ 
        while (remoteIterator.hasNext()) { 
         	FileStatus s = remoteIterator.next(); 
            System.out.println("路徑: " + s.getPath().toString()); 
            System.out.println("權限: " + s.getPermission().toString());
            System.out.println("大小: " + s.getLen()); 
         	/* 返回的是時間戳,轉化為時間日期格式 */ 
         	Long timeStamp = s.getModificationTime(); 
         	SimpleDateFormat format =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
         	String date = format.format(timeStamp);   
         	System.out.println("時間: " + date); 
         	System.out.println(); 
        }         fs.close(); 
    }     
     
 	/** 
 	 * 主函數 
 	 */ 
 	public static void main(String[] args) { 
 	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteDir = "/user/hadoop";    // HDFS 路徑 
 	 	 
 	 	try { 
 	 	 	System.out.println("(遞歸)讀取目錄下所有文件的信息: " + remoteDir); 
 	 	 	HDFSApi.lsDir(conf, remoteDir); 
 	 	 	System.out.println("讀取完成"); 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	} 
 	} 
}

(6)提供一個HDFS內的文件的路徑,對該文件進行創建和刪除操作。如果文件所在目錄不存在,則自動創建目錄;

Shell命令:

if $(hadoop fs -test -d /dir1/dir2);
then $(hadoop fs -touch /dir1/dir2/filename)
else $(hadoop fs -mkdir -p /dir1/dir2 && hdfs dfs -touch /dir1/dir2/filename)

Java代碼:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*;
 import java.i;
public class HDFSApi { 
    public static boolean test(Configuration conf, String path) throws IOException {         FileSystem fs = FileSystem.get(conf);         return fs.exists(new Path(path)); 
    } 
    public static boolean mkdir(Configuration conf, String remoteDir) throws IOException { 
        FileSystem fs = FileSystem.get(conf);         Path dirPath = new Path(remoteDir);         boolean result = fs.mkdirs(dirPath);         fs.close();         return result; 
    } 
     public static void touchz(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FSDataOutputStream outputStream = fs.create(remotePath);         outputStream.close();         fs.close(); 
    } 
    public static boolean rm(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf);         Path remotePath = new Path(remoteFilePath);         boolean result = fs.delete(remotePath, false);         fs.close();         return result; 
    } 
 	public static void main(String[] args) {  	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteFilePath = "/user/hadoop/input/text.txt";    // HDFS 路徑 
 	 	String remoteDir = "/user/hadoop/input";    // HDFS 路徑對應的目錄 	 
 	 	try { 
 	 	 	/* 判斷路徑是否存在,存在則刪除,否則進行創建 */ 
 	 	 	if ( HDFSApi.test(conf, remoteFilePath) ) { 
 	 	 	 	HDFSApi.rm(conf, remoteFilePath); // 刪除 
 	 	 	 	System.out.println("刪除路徑: " + remoteFilePath); 
 	 	 	} else { 
 	 	 	 	if ( !HDFSApi.test(conf, remoteDir) ) { // 若目錄不存在,則進行創建 
 	 	 	 	 	HDFSApi.mkdir(conf, remoteDir); 
 	 	 	 	 	System.out.println("創建文件夾: " + remoteDir); 
 	 	 	 	} 
 	 	 	 	HDFSApi.touchz(conf, remoteFilePath); 
 	 	 	 	System.out.println("創建路徑: " + remoteFilePath); 
 	 	 	} 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	} 
 	} 
}

(7)提供一個HDFS的目錄的路徑,對該目錄進行創建和刪除操作。創建目錄時,如果目錄文件所在目錄不存在,則自動創建相應目錄;刪除目錄時,由用戶指定當該目錄不為空時是否還刪除該目錄;

Shell命令:
創建目錄的命令如下:

$ ./bin/hdfs dfs –mkdir –p dir1/dir2

刪除目錄的命令如下:

$ ./bin/hdfs dfs –rmdir dir1/dir2

強制刪除非空目錄的命令如下:

$ ./bin/hdfs dfs –rm –R dir1/dir2

Java代碼:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; 
import java.io.*; 
public class HDFSApi { 
    public static boolean test(Configuration conf, String path) throws IOException {         FileSystem fs = FileSystem.get(conf);         return fs.exists(new Path(path)); 
    } 
    public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path dirPath = new Path(remoteDir); 
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);         return !remoteIterator.hasNext(); 
    } 
 	 
  
    public static boolean mkdir(Configuration conf, String remoteDir) throws IOException { 
        FileSystem fs = FileSystem.get(conf);         Path dirPath = new Path(remoteDir);         boolean result = fs.mkdirs(dirPath);         fs.close();         return result; 
    } 
    public static boolean rmDir(Configuration conf, String remoteDir) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path dirPath = new Path(remoteDir); 
        boolean result = fs.delete(dirPath, true);         fs.close();         return result; 
    } 
	public static void main(String[] args) {  	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteDir = "/user/hadoop/input";    // HDFS 目錄 
 	 	Boolean forceDelete = false;  // 是否強制刪除 
 	 	try { 
 	 	 	/* 判斷目錄是否存在,不存在則創建,存在則刪除 */ 
 	 	 	if ( !HDFSApi.test(conf, remoteDir) ) { 
 	 	 	 	HDFSApi.mkdir(conf, remoteDir); // 創建目錄 
 	 	 	 	System.out.println("創建目錄: " + remoteDir); 
 	 	 	} else {  
if ( HDFSApi.isDirEmpty(conf, remoteDir) || forceDelete ) { 
 	 	 	 	 	HDFSApi.rmDir(conf, remoteDir); 
 	 	 	 	 	System.out.println("刪除目錄: " + remoteDir); 
 	 	 	 	} else  { // 目錄不為空 
 	 	 	 	 	System.out.println("目錄不為空,不刪除: " + remoteDir); 
 	 	 	 	} 
 	 	 	} 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); }  }  }

(8)向HDFS中指定的文件追加內容,由用戶指定內容追加到原有文件的開頭或結尾;

Shell命令:
追加帶文件末尾:

hadoop fs -appendToFile /usr/local/hadoop/test.txt /user/test.txt

追加到文件開頭:

hadoop fs -get text.txt
cat text.txt >> local.txt
hadoop fs -copyFromLocal -f text.txt text.txt

Java代碼:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 
import java.io.*; 
public class HDFSApi { 
    public static boolean test(Configuration conf, String path) throws IOException {         FileSystem fs = FileSystem.get(conf);         return fs.exists(new Path(path)); 
    } 
    public static void appendContentToFile(Configuration conf, String content, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FSDataOutputStream out = fs.append(remotePath);        
 out.write(content.getBytes());         out.close();         fs.close(); 
} 
    public 	static 	void 	appendToFile(Configuration 	conf, 	String 	localFilePath, 	String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FileInputStream in = new FileInputStream(localFilePath); 
        FSDataOutputStream out = fs.append(remotePath); 
   
        byte[] data = new byte[1024];         int read = -1;         while ( (read = in.read(data)) > 0 ) {          	out.write(data, 0, read); 
        }         out.close(); 
in.close();      
   fs.close(); 
    } 
  public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath);         Path localPath = new Path(localFilePath);         fs.moveToLocalFile(remotePath, localPath); 
    } 
    public static void touchz(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath); 
        FSDataOutputStream outputStream = fs.create(remotePath);         outputStream.close();         fs.close(); 
    } 
     public static void main(String[] args) { 
 	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
	 	String remoteFilePath = "/user/hadoop/text.txt";    // HDFS 文件 
	 	 String content = "新追加的內容\n"; 
	 	 String choice = "after";   	 
 	            String choice = "before";    
	 	 	try { 
	 	 	 if ( !HDFSApi.test(conf, remoteFilePath) ) { 
	 	 	 	 System.out.println("文件不存在: " + remoteFilePath); 
	 	 	 	} else { 
	 	 	 	 if ( choice.equals("after") ) { // 追加在文件末尾 
	 	 	HDFSApi.appendContentToFile(conf, content, remoteFilePath); 
 	 	 	 	 	System.out.println("已追加內容到文件末尾" + remoteFilePath); 
 	 	 	 	} else if ( choice.equals("before") )  { 
 	 	 	 	 	String localTmpPath = "/user/hadoop/tmp.txt"; 
HDFSApi.moveToLocalFile(conf, remoteFilePath, localTmpPath); 
	 	 	 	 	HDFSApi.touchz(conf, remoteFilePath);  
	 	 	 	 	HDFSApi.appendContentToFile(conf, content, remoteFilePath); 
	 	 	 	 	HDFSApi.appendToFile(conf, localTmpPath, remoteFilePath);  
 	 	 	 	 	System.out.println("已追加內容到文件開頭: " + remoteFilePath); 
 	 	 	 	} 
 	 	 	} 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	} 
 	}

(9)刪除HDFS中指定的文件;
Shell命令:

hadoop fs -rm -R /user/test.txt

Java代碼:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*;
 import java.io.*; 
public class HDFSApi { 
    public static boolean rm(Configuration conf, String remoteFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path remotePath = new Path(remoteFilePath);         boolean result = fs.delete(remotePath, false);         fs.close();         return result; 
}
	public static void main(String[] args) {  Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteFilePath = "/user/hadoop/text.txt";    // HDFS 文件 
 	 	try { 
 	 	 	if ( HDFSApi.rm(conf, remoteFilePath) ) { 
 	 	 	 	System.out.println("文件刪除: " + remoteFilePath); 
 	 	 	} else { 
 	 	 	 	System.out.println("操作失敗(文件不存在或刪除失敗)"); 
 	 	 	} 
 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	} 
 	}  } 

(10)在HDFS中,將文件從源路徑移動到目的路徑。

Shell命令:

hadoop fs -mv /user/hadoop/test/test.txt /user

Java代碼:

import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*; 
import java.io.*; 
public 	static 	boolean 	mv(Configuration conf,remoteToFilePath) throws IOException { 
        FileSystem fs = FileSystem.get(conf); 
        Path srcPath = new Path(remoteFilePath);        
 Path dstPath = new Path(remoteToFilePath);       
  boolean result = fs.rename(srcPath, dstPath);        
 fs.close();       
  return result; 
   } 
 	public static void main(String[] args) {  	 	Configuration conf = new Configuration();     conf.set("fs.default.name","hdfs://localhost:9000"); 
 	 	String remoteFilePath = "hdfs:///user/hadoop/text.txt";    // 源文件 HDFS 路徑 
 	 	String remoteToFilePath = "hdfs:///user/hadoop/new.txt";    // 目的 HDFS 路徑  	 
 	 	try { 
 	 	 	if ( HDFSApi.mv(conf, remoteFilePath, remoteToFilePath) ) { 
 	 System.out.println(" 將文件 " + remoteFilePath + " 移動到 " +remoteToFilePath); 
 	 	 	} else { 
 	 	 	 	 System.out.println("操作失敗(源文件不存在或移動失敗)"); 
 	 	 	} 
 	 	} catch (Exception e) { 
 	 	 	e.printStackTrace(); 
 	 	}  }  }

實驗三 熟悉常用的HBase操作

1.實驗目的

  • 理解HBase在Hadoop體系結構中的角色;
  • 熟練使用HBase操作常用的Shell命令;

2.實驗平台

  • 操作系統:Linux(建議Ubuntu16.04);
  • Hadoop版本:2.7.1;
  • HBase版本:1.1.5;
  • JDK版本:1.7或以上版本;
  • Java IDE:Eclipse。

3.實驗步驟

(一)編程實現以下指定功能,並用Hadoop提供的HBase Shell命令完成相同任務:
(1)列出HBase所有的表的相關信息,例如表名;

shell語句:

List

Java 代碼:

public static void listTables() throws IOException {     init();//建立連接 
    HTableDescriptor hTableDescriptors[] = admin.listTables();     for(HTableDescriptor hTableDescriptor :hTableDescriptors){ 
        System.out.println("表名:"+hTableDescriptor.getNameAsString()); 
    } 
    close();//關閉連接 
}

(2)在終端打印出指定的表的所有記錄數據;

shell語句:

scan ‘s1’

結果:

Java代碼:

public static void getData(String tableName)throws  IOException{     init(); 
    Table table = connection.getTable(TableName.valueOf(tableName)); 
    Scan scan = new Scan(); 
    ResultScanner scanner = table.getScanner(scan);     for (Result result:scanner){         printRecoder(result); 
    }     close(); 
} 
//打印一條記錄的詳情 
public  static void printRecoder(Result result)throws IOException{     for(Cell cell:result.rawCells()){ 
        System.out.print("行健: "+new String(CellUtil.cloneRow(cell))); 
        System.out.print("列簇: "+new String(CellUtil.cloneFamily(cell))); 
        System.out.print(" 列: "+new String(CellUtil.cloneQualifier(cell))); 
        System.out.print(" 值: "+new String(CellUtil.cloneValue(cell))); 
        System.out.println("時間戳: "+cell.getTimestamp());     } 
}

(3)向已經創建好的表添加和刪除指定的列族或列;

shell語句

put 's1',zhangsan','Score:Math','69'

Java代碼:

public static void insertRow(String tableName,String rowKey,String colFamily,String col,String val) throws IOException { 
    init(); 
    Table table = connection.getTable(TableName.valueOf(tableName));     Put put = new Put(rowKey.getBytes());     put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());     table.put(put);     table.close();     close(); 
} 
 	insertRow("s1",'zhangsan','score','Math','69') 
public static void deleteRow(String tableName,String rowKey,String colFamily,String col) throws IOException {     init(); 
    Table table = connection.getTable(TableName.valueOf(tableName)); 
    Delete delete = new Delete(rowKey.getBytes()); 
    delete.addFamily(Bytes.toBytes(colFamily)); 
    delete.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col));     table.delete(delete);     table.close();     close(); 
} 
deleteRow("s1",'zhangsan','score','Math') ;

(4)清空指定的表的所有記錄數據;

shell語句:

truncate ‘s1’

Java代碼:

public static void clearRows(String tableName)throws IOException{
     init(); 
TableName tablename = TableName.valueOf(tableName); 
    admin.disableTable(tablename);
     admin.deleteTable(tablename); 
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); 
    admin.createTable(hTableDescriptor);
     close(); 
}

(5)統計表的行數。

shell語句:

count 's1'

Java代碼:

public static void countRows(String tableName)throws IOException{
     init(); 
    Table table = connection.getTable(TableName.valueOf(tableName)); 
    Scan scan = new Scan(); 
ResultScanner scanner = table.getScanner(scan);   
  int num = 0;   
  for (Result result = scanner.next();result!=null;result=scanner.next()){  
       num++; 
    } 
System.out.println("行數:"+ num);  
scanner.close(); 
close(); 
}

實驗四 MapReduce/Spark編程初級實踐

1.實驗目的

  1. 通過實驗掌握基本的MapReduce/Spark編程方法;
  2. 掌握用MapReduce/Spark解決一些常見的數據處理問題,包括數據去重、數據排序和數據挖掘等。

2.實驗平台

  • 操作系統:Linux(建議Ubuntu16.04)
  • Hadoop版本:2.7.1
  • Spark版本2.0以上

3.實驗步驟

(一)編程實現文件合並和去重操作

對於兩個輸入文件,即文件A和文件B,請編寫MapReduce程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新的輸出文件C。下面是輸入文件和輸出文件的一個樣例供參考。

輸入文件A的樣例如下:

	20150101     x
	20150102     y
	20150103     x
	20150104     y
	20150105     z
	20150106     x

輸入文件B的樣例如下:
	20150101      y
	20150102      y
	20150103      x
	20150104      z
	20150105      y

根據輸入文件A和B合並得到的輸出文件C的樣例如下:
	20150101      x
	20150101      y
	20150102      y
	20150103      x
	20150104      y
	20150104      z
	20150105      y
	20150105      z
	20150106      x

Java代碼

package com.Merge; 
 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; 
 
public class Merge { 
 
 /** 
*	@param args 
*	對A,B兩個文件進行合並,並剔除其中重復的內容,得到一個新的輸出文件C 
  */ 
 //重載map函數,直接將輸入中的value復制到輸出數據的key上 
 public static class Map extends Mapper<Object, Text, Text, Text>{ 
  private static Text text = new Text(); 
  public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ 
   text = value; 
   context.write(text, new Text("")); 
  } 
 } 
  
 //重載reduce函數,直接將輸入中的key復制到輸出數據的key上 
 public static class Reduce extends Reducer<Text, Text, Text, Text>{   public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{ 
   context.write(key, new Text("")); 
  }  } 
  
 public static void main(String[] args) throws Exception{ 
   
  // TODO Auto-generated method stub 
  Configuration conf = new Configuration(); 
conf.set("fs.default.name","hdfs://localhost:9000"); 
  String[] otherArgs = new String[]{"input","output"}; /* 直接設置輸入參數 
*/ 
  if (otherArgs.length != 2) { 
   System.err.println("Usage: wordcount <in><out>"); 
   System.exit(2); 
   } 
  Job job = Job.getInstance(conf,"Merge and duplicate removal");   job.setJarByClass(Merge.class);   job.setMapperClass(Map.class);   job.setCombinerClass(Reduce.class);   job.setReducerClass(Reduce.class); 
  job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class); 
  FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
  System.exit(job.waitForCompletion(true) ? 0 : 1); 
 } 
}

(二)編寫程序實現對輸入文件的排序

現在有多個輸入文件,每個文件中的每行內容均為一個整數。要求讀取所有文件中的整數,進行升序排序后,輸出到一個新的文件中,輸出的數據格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入文件和輸出文件的一個樣例供參考。

輸入文件1的樣例如下:
33
37
12
40

輸入文件2的樣例如下:
4
16
39
5

輸入文件3的樣例如下:
1
45
25

根據輸入文件1、2和3得到的輸出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45

Java代碼如下:

package com.MergeSort; 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; 
public class MergeSort { 
 public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{ 
private static IntWritable data = new IntWritable(); 
  public void map(Object key, Text value, Context context) throws 
IOException,InterruptedException{ 
   String text = value.toString();    data.set(Integer.parseInt(text)); 
   context.write(data, new IntWritable(1)); 
  }  } 
 public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 
  private static IntWritable line_num = new IntWritable(1); 
  public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{    for(IntWritable val : values){     context.write(line_num, key); 
    line_num = new IntWritable(line_num.get() + 1); 
   }  }  } 
Partiton ID 
 public static class Partition extends Partitioner<IntWritable, IntWritable>{   public int getPartition(IntWritable key, IntWritable value, int num_Partition){ 
   int Maxnumber = 65223;
   int bound = Maxnumber/num_Partition+1;    int keynumber = key.get();    for (int i = 0; i<num_Partition; i++){     if(keynumber<bound * (i+1) && keynumber>=bound * i){ 
     return i;     } 
   } 
   return -1; 
  }  }   
 public static void main(String[] args) throws Exception{ 
  Configuration conf = new Configuration(); 
conf.set("fs.default.name","hdfs://localhost:9000"); 
 String[] otherArgs = new String[]{"input","output"}; 
  if (otherArgs.length != 2) { 
   System.err.println("Usage: wordcount <in><out>"); 
   System.exit(2); 
   } 
Job job = Job.getInstance(conf,"Merge and sort");   job.setJarByClass(MergeSort.class);   job.setMapperClass(Map.class);   job.setReducerClass(Reduce.class);   job.setPartitionerClass(Partition.class);   job.setOutputKeyClass(IntWritable.class);   job.setOutputValueClass(IntWritable.class); 
  FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
  System.exit(job.waitForCompletion(true) ? 0 : 1);   } 
}

(三)對給定的表格進行信息挖掘

下面給出一個child-parent的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格。

輸入文件內容如下:
	child          parent
	Steven        Lucy
	Steven        Jack
	Jone         Lucy
	Jone         Jack
	Lucy         Mary
	Lucy         Frank
	Jack         Alice
	Jack         Jesse
	David       Alice
	David       Jesse
	Philip       David
	Philip       Alma
	Mark       David
	Mark       Alma

輸出文件內容如下:
	grandchild       grandparent
	Steven          Alice
	Steven          Jesse
	Jone            Alice
	Jone            Jesse
	Steven          Mary
	Steven          Frank
	Jone            Mary
	Jone            Frank
	Philip           Alice
	Philip           Jesse
	Mark           Alice
	Mark           Jesse

Java代碼

package com.simple_data_mining; 
import java.io.IOException; 
import java.util.*; 
import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
public class simple_data_mining {  public static int time = 0; 
 public static class Map extends Mapper<Object, Text, Text, Text>{   
public void map(Object key, Text value, Context context) throws 
IOException,InterruptedException{ 
   String child_name = new String(); 
   String parent_name = new String(); 
   String relation_type = new String();    
String line = value.toString();    int i = 0; 
   while(line.charAt(i) != ' '){     i++; 
   } 
   String[] values = {line.substring(0,i),line.substring(i+1)};    if(values[0].compareTo("child") != 0){     
child_name = values[0];     
parent_name = values[1];     
relation_type = "1";
    context.write(new Text(values[1]), 
new Text(relation_type+"+"+child_name+"+"+parent_name)); 
    relation_type = "2"; 
    context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));  
   }  }  } 
 public static class Reduce extends Reducer<Text, Text, Text, Text>{ 
  public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{ 
   if(time == 0){   
    context.write(new Text("grand_child"), new 
Text("grand_parent")); 
    time++; 
   } 
   int grand_child_num = 0; 
   String grand_child[] = new String[10];    int grand_parent_num = 0; 
   String grand_parent[]= new String[10];    Iterator ite = values.iterator(); 
   while(ite.hasNext()){ 
    String record = ite.next().toString(); 
    int len = record.length();     int i = 2; 
    if(len == 0) continue; 
    char relation_type = record.charAt(0);   
 String child_name = new String(); 
String parent_name = new String(); 
    while(record.charAt(i) != '+'){ 
     child_name = child_name + record.charAt(i); 
     i++; 
    } 
    i=i+1; 
while(i<len){ 
     parent_name = parent_name+record.charAt(i); 
     i++; 
    } 
if(relation_type == '1'){ 
     grand_child[grand_child_num] = child_name; 
     grand_child_num++; 
    } 
    else{
   grand_parent[grand_parent_num] = parent_name; 
     grand_parent_num++; 
    }    } 
   if(grand_parent_num != 0 && grand_child_num != 0 ){ 
 for(int m = 0;m<grand_child_num;m++){      for(int n=0;n<grand_parent_num;n++){ 
      context.write(new Text(grand_child[m]), new 
Text(grand_parent[n])); 
      }  }   }  }  } 
 public static void main(String[] args) throws Exception{ 
  Configuration conf = new Configuration(); 
conf.set("fs.default.name","hdfs://localhost:9000"); 
  String[] otherArgs = new String[]{"input","output"};
  if (otherArgs.length != 2) { 
   System.err.println("Usage: wordcount <in><out>"); 
System.exit(2); 
   } 
Job job = Job.getInstance(conf,"Single table join");   job.setJarByClass(simple_data_mining.class);   job.setMapperClass(Map.class);   job.setReducerClass(Reduce.class);   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class); 
  FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
  System.exit(job.waitForCompletion(true) ? 0 : 1); 
} } 

   

如有不足之處,還望指正 [1]


  1. 如果對您有幫助可以點贊、收藏、關注,將會是我最大的動力 ↩︎


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM