HDFS -- hadoop的FileSystem 與 Flink的HadoopFileSystem樣例


最近測試結論

Flink跨集群操作HDFS文件,不能再main方法中操作,而是要到算子里操作;
不跨集群,在哪操作都行(main方法里,算子里都行)

hadoop的FileSytem操作HDFS 樣例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;

public class FileSystemTest {
    public static void main(String[] args) throws Exception{
        //Configuration conf = new Configuration();
        //conf.set("fs.defaultFS","hdfs://node01:8020/");
        //FileSystem fileSystem = FileSystem.get(conf);

		//方式一:
		//Configuration conf = new Configuration();
        //conf.addResource("hbase-site.xml");
        //FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), conf,"root");
		
		//方式二:
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");
        RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(new Path("/test/"), true);
        Path outPath = new Path("/fileSystemData02/");
        BufferedWriter writer;
        FSDataOutputStream out = fileSystem.create(outPath);
        FSDataInputStream in;
        while (itr.hasNext()){
            LocatedFileStatus next = itr.next();
            Path path = next.getPath();
            in = fileSystem.open(path);
            BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
            writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
            String line;
            while((line = reader.readLine()) != null) {
                writer.write(line);
                writer.newLine();
                writer.flush();
            }
            in.close();
        }
        out.close();
    }
}

結果:
image
image
image

結論:Resource內有無core-site.xml和hdfs-site.xml對使用FileSytem操作HDFS無影響


Flink的HadoopFileSystem操作HDFS 樣例

import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;

public class HadoopFileSystemTest {
    public static void main(String[] args) throws Exception{
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");
        HadoopFileSystem hadoopFileSystem = new HadoopFileSystem(fileSystem);
        org.apache.flink.core.fs.FileStatus[] fileStatuses = hadoopFileSystem
                .listStatus(new org.apache.flink.core.fs.Path("/test/buketingsink/20200608/"));
        Path outPath = new org.apache.flink.core.fs.Path("/hadoopFileSystemData02/");
        HadoopDataInputStream in;
        HadoopDataOutputStream out = hadoopFileSystem.create(outPath, org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE);
        BufferedWriter writer;
        for (FileStatus fileStatus : fileStatuses) {
            Path filePath = fileStatus.getPath();
            in = hadoopFileSystem.open(filePath);
            BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
            writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
            String line;
            while((line = reader.readLine()) != null) {
                writer.write(line);
                writer.newLine();
                writer.flush();
            }
            in.close();
        }
        out.close();
    }
}

結果:
image
image

結論:Resource內有無core-site.xml和hdfs-site.xml對使用HadoopFileSytem操作HDFS也無影響


操作過程出現的異常

1.

Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=Q, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
image
使用Flink 1.10 的FileSystem出現的異常

FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"));
        org.apache.flink.core.fs.FileStatus[] fileStatuses = fileSystem
                .listStatus(new org.apache.flink.core.fs.Path("/test/buketingsink/20200608/"));
        Path outPath = new org.apache.flink.core.fs.Path("/flinkFileSystemData01/");

原因: 操作權限不夠,需要偽造用戶
image


https://blog.csdn.net/llwy1428/article/details/85598770

File Systems

fs.default-scheme:設置默認的文件系統模式.默認值是file:/// 即本地文件系統根目錄.如果指定了 hdfs://localhost:9000/,則程序中指定的文件/user/USERNAME/in.txt,即指向了hdfs://localhost:9000/user/USERNAME/in.txt.這個值僅僅當沒有其他schema被指定時生效.一般hadoop中core-site.xml中都會配置fs.default.name
fs.overwrite-files:當輸出文件到文件系統時,是否覆蓋已經存在的文件.默認false
fs.output.always-create-directory:文件輸出時是否單獨創建一個子目錄.默認是false,即直接將文件輸出到指向的目錄下

Flink使用的HDFS相關的配置

fs.hdfs.hadoopconf:hadoop的配置路徑.Flink將在此路徑下尋找core-site.xml和hdfs-site.xml中的配置信息,此值默認是null.如果需要用到HDFS,則必須配置此值,例如$HADOOP_HOME/etc/hadoop
fs.hdfs.hdfsdefault:hdfs-default.xml的路勁.默認是null.如果設置了第一個值,則此值不用設置.
fs.hdfs.hdfssite:hdfs-site.xml的路徑.默認是null.如果設置了第一個值,則此值不用設置.



Flink流處理 + Flink的HadoopFileSystem讀寫HDFS

import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.io.*;
import java.net.URI;

public class TestStreamingFileSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource<String> inputStream = env.addSource(new RichSourceFunction<String>() {
            boolean flag = true;
            Path outPath;
            FileSystem fileSystem;
            HadoopFileSystem hadoopFileSystem;
            HadoopDataInputStream in;
            HadoopDataOutputStream out;
            BufferedReader reader;
            BufferedWriter writer;

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(), "root");
                hadoopFileSystem = new HadoopFileSystem(fileSystem);
                outPath = new Path("/hadoopFileSystemData75/");
                out = hadoopFileSystem.create(outPath, org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE);
                writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
            }

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (flag) {
                    FileStatus[] fileStatuses = hadoopFileSystem.listStatus(new Path("/test/buketingsink/20200608/"));
                    for (FileStatus fileStatus : fileStatuses) {
                        Path filePath = fileStatus.getPath();
                        in = hadoopFileSystem.open(filePath);
                        reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
                        String line;
                        while ((line = reader.readLine()) != null) {
                            ctx.collect(line);
                            writer.write(line);
                            writer.newLine();
                        }
                        reader.close();
                    }
                    writer.flush();
                }
            }

            @Override
            public void cancel() {
                flag = false;
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

        inputStream.printToErr();
        //inputStream.addSink(BaseTask.getSink("test", ".txt","/streamFileSink66", "yyyy-MM-dd"));

        env.execute();
    }
}

結果: OK
image

但是注意:當設置hadoopFileSystem寫入HDFS不重寫(NO_OVERWRITE)時,注意輸出文件夾不能存在
image

遺留問題: StreamingFileSink 寫入HDFS時需要偽裝用戶,不知道怎么操作
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM