總結了一下三個方法:hdfs自帶 按字節復制 按行復制 (在java io里還有字符復制,暫且不提)
因為hdfs自帶的,不知道為什么有些場合不能用,每次能下載的個數還不一定,所以就考慮自己按照java的方式來復制,就出現第2、3種方法。
有時間好好研究一下IO,比如針對特殊文件,文件復制會出現大小不一樣的情況。這里
// void downloadFromHdfs(String hdfsSrc , String localDst) // String hdfsDst = "hdfs://54.0.88.53:8020/user/flume/SyslogNetwork/"; // String localDir = "D://flume//"; //下載單個文件 public static boolean downloadFromHdfs(String hdfsSrc, String localDst) { Configuration conf = new Configuration(); Path dst = new Path(hdfsSrc); try { Path Src = new Path(hdfsSrc); String Filename = Src.getName().toString(); String local = localDst + Filename; Path Dst = new Path(local); FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf); FSDataInputStream in = fs.open(Src); OutputStream output = new FileOutputStream(new File(local)); IOUtils.copyBytes(in, output, 4096, true); System.out.print(" download successed."); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.print(" download failed."); return false; } return true; } //下載目錄下所有文件,方法1: IOUtils.copyBytes或者copyToLocal public static boolean downFromHdfsDir(String hdfsSrc, String localDst) throws IOException { Configuration conf = new Configuration(); Path dstpath = new Path(hdfsSrc); int i = 1; FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf); try { String subPath = ""; FileStatus[] fList = fs.listStatus(dstpath); for (FileStatus f : fList) { if (null != f) { subPath = new StringBuffer() .append(f.getPath().getParent()).append("/") .append(f.getPath().getName()).toString(); if (f.isDir()) { downFromHdfsDir(subPath, localDst); } else { System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/ Path dst = new Path(subPath); i++; FSDataInputStream in = null; OutputStream output = null; try { Path Src = new Path(subPath); String Filename = Src.getName().toString(); String local = localDst + Filename; Path Dst = new Path(local); FileSystem hdfs = FileSystem.get(URI .create(subPath), conf); in = hdfs.open(Src); output = new FileOutputStream(new File(local)); // true-是否關閉數據流,如果是false則在finally里關閉 // IOUtils.copyBytes(in, output, 4096, false); IOUtils.copyBytes(in, output, conf); output.flush(); System.out.print(" download successed."); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.print(" download failed."); } finally { IOUtils.closeStream(in); IOUtils.closeStream(output); } } } } } catch (Exception e) { } finally { System.out.println("the number of files is :" + i); } return true; } //下載目錄下所有文件,方法2: 按字節復制 public static boolean downFromHdfsDir2(String hdfsSrc, String localDst) throws IOException { Configuration conf = new Configuration(); Path dstpath = new Path(hdfsSrc); int i = 1; FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf); try { String subPath = ""; FileStatus[] fList = fs.listStatus(dstpath); for (FileStatus f : fList) { if (null != f) { subPath = new StringBuffer() .append(f.getPath().getParent()).append("/") .append(f.getPath().getName()).toString(); if (f.isDir()) { downFromHdfsDir(subPath, localDst); } else { System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/ Path dst = new Path(subPath); i++; try { Path Src = new Path(subPath); String Filename = Src.getName().toString(); String local = localDst + Filename; Path Dst = new Path(local); FileSystem localFS = FileSystem.getLocal(conf); FileSystem hdfs = FileSystem.get(URI .create(subPath), conf); FSDataInputStream in = hdfs.open(Src); FSDataOutputStream output = localFS.create(Dst); byte[] buf = new byte[1024]; int readbytes = 0; while ((readbytes = in.read(buf)) > 0) { output.write(buf, 0, readbytes); } in.close(); output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.print(" download failed."); } finally { } } } } } catch (Exception e) { } finally { System.out.println("the number of files is :" + i); } return true; } //下載目錄下所有文件,方法2: 按行復制 public static boolean downFromHdfsDir3(String hdfsSrc, String localDst) throws IOException { Configuration conf = new Configuration(); Path dstpath = new Path(hdfsSrc); int i = 1; FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf); try { String subPath = ""; FileStatus[] fList = fs.listStatus(dstpath); for (FileStatus f : fList) { if (null != f) { subPath = new StringBuffer() .append(f.getPath().getParent()).append("/") .append(f.getPath().getName()).toString(); if (f.isDir()) { downFromHdfsDir(subPath, localDst); } else { System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/ Path dst = new Path(subPath); i++; try { Path Src = new Path(subPath); String Filename = Src.getName().toString(); String local = localDst + Filename; Path Dst = new Path(local); FileSystem localFS = FileSystem.getLocal(conf); FileSystem hdfs = FileSystem.get(URI .create(subPath), conf); FSDataInputStream in = hdfs.open(Src); BufferedReader read = new BufferedReader(new InputStreamReader(in)); BufferedWriter output=new BufferedWriter(new FileWriter(local)); String line = null; while ((line = read.readLine()) != null) { output.append(line); output.newLine(); output.flush(); } in.close(); read.close(); output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.print(" download failed."); } finally { } } } } } catch (Exception e) { } finally { System.out.println("the number of files is :" + i); } return true; }
一次讀取整個文件
OutputStream:(一次讀入整個文件) 字節 private static String readHdfsFile2(FileSystem fs, Path path, String charset) throws IOException { FSDataInputStream hdfsInStream = fs.open(path); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); while (-1 != readLen) { bos.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } hdfsInStream.close(); return new String(bos.toByteArray(), charset); }
或者 FileStatus status = fs.getFileStatus(Src); byte[] buffer = new byte[Integer.parseInt(String.valueOf(status.getLen()))]; in.readFully(0, buffer); is.close(); fs.close(); System.out.println(buffer.toString());