FTP文件上傳到HDFS上


在做測試數據時,往往會有ftp數據上傳到hdfs的需求,一般需要手動操作,這樣做太費事,於是有了下邊代碼實現的方式:

ftp數據上傳到hdfs函數:

import java.io.InputStream;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * Created by Administrator on 11/10/2017.
 */
public class FtpUtil {
    /**
     * loadFromFtpToHdfs:將數據從ftp上傳到hdfs上. <br/>
     *
     * @param ip
     * @param username
     * @param password
     * @param filePath
     * @param outputPath
     * @param conf
     * @return
     * @author qiyongkang
     * @since JDK 1.8
     */
    public static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath, String outputPath, Configuration conf) {
        FTPClient ftp = new FTPClient();
        InputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        boolean flag = true;
        try {
            ftp.connect(ip);
            ftp.login(username, password);
            ftp.setFileType(FTP.BINARY_FILE_TYPE);
            ftp.setControlEncoding("UTF-8");
            int reply = ftp.getReplyCode();
            if (!FTPReply.isPositiveCompletion(reply)) {
                ftp.disconnect();
            }
            FTPFile[] files = ftp.listFiles(filePath);
            FileSystem hdfs = FileSystem.get(conf);
            for (FTPFile file : files) {
                if (!(file.getName().equals(".") || file.getName().equals(".."))) {
                    inputStream = ftp.retrieveFileStream(filePath + file.getName());
                    outputStream = hdfs.create(new Path(outputPath + file.getName()));
                    IOUtils.copyBytes(inputStream, outputStream, conf, false);
                    if (inputStream != null) {
                        inputStream.close();
                        ftp.completePendingCommand();
                    }
                }
            }
            ftp.disconnect();
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }
}

main調用函數:

import org.apache.hadoop.conf.Configuration

/**
  * Created by Administrator on 11/10/2017.
  */
object FtpDownToHdfsMain {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    FtpUtil.loadFromFtpToHdfs("192.168.1.23", "test", "abc123", "/www/input/", "/user/jr/dt/fblib/", conf)
  }
}

使用yarn jar提交:

yarn jar myapp.jar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM