Hadoop文件系統操作之讀取寫入數據


一.從hadoop文件系統hdfs讀取文件

  讀取hdfs文件有兩種方法:

  1.使用java.net.URL對象打開數據流,從中讀取代碼

import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; public class URLCat { static 
  { URL.setURLStreamHandlerFactory(
new FsUrlStreamHandlerFactory()); } public static void main(String [] args) throws MalformedURLException, IOException { try(InputStream in = new URL(args[0]).openStream();) { IOUtils.copy(in, System.out); } } }

  要想讓java程序能識別Hadoop的hdfs URL必須通過FsUrlStreamHandlerFactory實例調用java.net.URL對象的setURLStreamHandlerFactory方法,然而每個java虛擬機只能調用一次這個方法,這意味着如果如果程序的其他組件已經聲明了一個URLStreamHandlerFactory對象,將無法用這種方法從hadoop讀取文件

  2.通過調用FileSystem API 讀取數據

import java.io.*; import java.net.URI; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class FileSystemCat { public static void main(String [] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri),conf); try(InputStream in = fs.open(new Path(uri))) { IOUtils.copy(in,System.out); //((FSDataInputStream) in).seek(0); //IOUtils.copy(in, System.out);
 } } }

  

  Configuration 對象封裝了客戶端或者服務端的配置

  public static FileSystem.get(URI uri,Configuration conf)通過給定的URL來確定要使用的文件系統

  Public FSDateInputStream open(Path f) throws IOException用來獲取文件的輸入流

  FSDateInputStream類繼承了Seekable接口和PositionedReadable接口,因此可以從流的隨意位置讀取數據,Seelable接口提供的seek()類是一個相對高開銷的操作,需要謹慎使用

 

二.向hadoop文件系統hdfs寫入數據

 

import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Progressable; public class FileCopyWithProgress { public static void main(String[] args) throws IOException { String localsrc = args[0]; String dst = args[1]; InputStream in = new BufferedInputStream(new FileInputStream(localsrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst),conf); try (OutputStream out = fs.create(new Path(dst),new Progressable(){ public void progress() { System.out.print(".");//用於顯示文件復制進度 } })) { IOUtils.copy(in, out); } } }

 

使用運行參數:"/home/hadoop/Desktop/1.txt"     "hdfs://localhost/user/hadoop/output/1.txt"

程序在運行時可能會出現錯誤:

Exception in thread "main" java.net.ConnectException: Call From thewolf/127.0.1.1 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused

原因:是沒有指定訪問分布式文件系統的端口號:即配置文件里面設定的文件系統端口,執行程序會根據默認的8020端口去訪問文件系統,而訪問不到出現上述錯誤。

解決方法:指定訪問hdfs的端口號(自己配置的端口號,我配置的是9000):更改運行參數為:"/home/hadoop/Desktop/1.txt"     "hdfs://localhost:9000/user/hadoop/output/1.txt"

 

  public FSDataOutputStream create(Path f) throws IOException 能夠為需要寫入切當前不存在的文件創建父文件目錄,可先用exists()方法檢查父目錄是否存在;append()方法用於在一個已經存在的文件末尾追加數據

  FSDataInputStream繼承了借口Syncable,可用getPos()方法查詢文件當前位置,但是FSDataInputStream類不允許在文件中定位,因為HDFS只允許對一個已打開文件順序寫入,或者在現有文件后面追加數據。

  Progressable用於傳遞回調接口,每次Hadoop將64KB數據寫入datanode管線后都會調用Progressable接口的progress

public interface Progressable {   public void progress(); }

      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM