總結了一下三個方法:hdfs自帶 按字節復制 按行復制 (在java io里還有字符復制,暫且不提)
因為hdfs自帶的,不知道為什么有些場合不能用,每次能下載的個數還不一定,所以就考慮自己按照java的方式來復制,就出現第2、3種方法。
有時間好好研究一下IO,比如針對特殊文件,文件復制會出現大小不一樣的情況。這里
// void downloadFromHdfs(String hdfsSrc , String localDst)
// String hdfsDst = "hdfs://54.0.88.53:8020/user/flume/SyslogNetwork/";
// String localDir = "D://flume//";
//下載單個文件
public static boolean downloadFromHdfs(String hdfsSrc, String localDst) {
Configuration conf = new Configuration();
Path dst = new Path(hdfsSrc);
try {
Path Src = new Path(hdfsSrc);
String Filename = Src.getName().toString();
String local = localDst + Filename;
Path Dst = new Path(local);
FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);
FSDataInputStream in = fs.open(Src);
OutputStream output = new FileOutputStream(new File(local));
IOUtils.copyBytes(in, output, 4096, true);
System.out.print(" download successed.");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.print(" download failed.");
return false;
}
return true;
}
//下載目錄下所有文件,方法1: IOUtils.copyBytes或者copyToLocal
public static boolean downFromHdfsDir(String hdfsSrc, String localDst)
throws IOException {
Configuration conf = new Configuration();
Path dstpath = new Path(hdfsSrc);
int i = 1;
FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);
try {
String subPath = "";
FileStatus[] fList = fs.listStatus(dstpath);
for (FileStatus f : fList) {
if (null != f) {
subPath = new StringBuffer()
.append(f.getPath().getParent()).append("/")
.append(f.getPath().getName()).toString();
if (f.isDir()) {
downFromHdfsDir(subPath, localDst);
} else {
System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/
Path dst = new Path(subPath);
i++;
FSDataInputStream in = null;
OutputStream output = null;
try {
Path Src = new Path(subPath);
String Filename = Src.getName().toString();
String local = localDst + Filename;
Path Dst = new Path(local);
FileSystem hdfs = FileSystem.get(URI
.create(subPath), conf);
in = hdfs.open(Src);
output = new FileOutputStream(new File(local));
// true-是否關閉數據流,如果是false則在finally里關閉
// IOUtils.copyBytes(in, output, 4096, false);
IOUtils.copyBytes(in, output, conf);
output.flush();
System.out.print(" download successed.");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.print(" download failed.");
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(output);
}
}
}
}
} catch (Exception e) {
} finally {
System.out.println("the number of files is :" + i);
}
return true;
}
//下載目錄下所有文件,方法2: 按字節復制
public static boolean downFromHdfsDir2(String hdfsSrc, String localDst)
throws IOException {
Configuration conf = new Configuration();
Path dstpath = new Path(hdfsSrc);
int i = 1;
FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);
try {
String subPath = "";
FileStatus[] fList = fs.listStatus(dstpath);
for (FileStatus f : fList) {
if (null != f) {
subPath = new StringBuffer()
.append(f.getPath().getParent()).append("/")
.append(f.getPath().getName()).toString();
if (f.isDir()) {
downFromHdfsDir(subPath, localDst);
} else {
System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/
Path dst = new Path(subPath);
i++;
try {
Path Src = new Path(subPath);
String Filename = Src.getName().toString();
String local = localDst + Filename;
Path Dst = new Path(local);
FileSystem localFS = FileSystem.getLocal(conf);
FileSystem hdfs = FileSystem.get(URI
.create(subPath), conf);
FSDataInputStream in = hdfs.open(Src);
FSDataOutputStream output = localFS.create(Dst);
byte[] buf = new byte[1024];
int readbytes = 0;
while ((readbytes = in.read(buf)) > 0) {
output.write(buf, 0, readbytes);
}
in.close();
output.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.print(" download failed.");
} finally {
}
}
}
}
} catch (Exception e) {
} finally {
System.out.println("the number of files is :" + i);
}
return true;
}
//下載目錄下所有文件,方法2: 按行復制
public static boolean downFromHdfsDir3(String hdfsSrc, String localDst)
throws IOException {
Configuration conf = new Configuration();
Path dstpath = new Path(hdfsSrc);
int i = 1;
FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);
try {
String subPath = "";
FileStatus[] fList = fs.listStatus(dstpath);
for (FileStatus f : fList) {
if (null != f) {
subPath = new StringBuffer()
.append(f.getPath().getParent()).append("/")
.append(f.getPath().getName()).toString();
if (f.isDir()) {
downFromHdfsDir(subPath, localDst);
} else {
System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/
Path dst = new Path(subPath);
i++;
try {
Path Src = new Path(subPath);
String Filename = Src.getName().toString();
String local = localDst + Filename;
Path Dst = new Path(local);
FileSystem localFS = FileSystem.getLocal(conf);
FileSystem hdfs = FileSystem.get(URI
.create(subPath), conf);
FSDataInputStream in = hdfs.open(Src);
BufferedReader read = new BufferedReader(new InputStreamReader(in));
BufferedWriter output=new BufferedWriter(new FileWriter(local));
String line = null;
while ((line = read.readLine()) != null) {
output.append(line);
output.newLine();
output.flush();
}
in.close();
read.close();
output.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.print(" download failed.");
} finally {
}
}
}
}
} catch (Exception e) {
} finally {
System.out.println("the number of files is :" + i);
}
return true;
}
一次讀取整個文件
OutputStream:(一次讀入整個文件) 字節
private static String readHdfsFile2(FileSystem fs, Path path, String charset)
throws IOException {
FSDataInputStream hdfsInStream = fs.open(path);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] ioBuffer = new byte[1024];
int readLen = hdfsInStream.read(ioBuffer);
while (-1 != readLen) {
bos.write(ioBuffer, 0, readLen);
readLen = hdfsInStream.read(ioBuffer);
}
hdfsInStream.close();
return new String(bos.toByteArray(), charset);
}
或者 FileStatus status = fs.getFileStatus(Src); byte[] buffer = new byte[Integer.parseInt(String.valueOf(status.getLen()))]; in.readFully(0, buffer); is.close(); fs.close(); System.out.println(buffer.toString());
