Flink知识点


1. Flink、Storm、Sparkstreaming对比

Storm只支持流处理任务,数据是一条一条的源源不断地处理,而MapReduce、spark只支持批处理任务,spark-streaming本质上是一个批处理,采用micro-batch的方式,将数据流切分成细粒度的batch进行处理。Flink同时支持流处理和批处理,一条数据被处理完以后,序列化到缓存后,以固定的缓存块为单位进行网络数据传输,缓存块设为0为流处理,缓存块设为较大值为批处理。
storm------ ---------At-least-once----Record Acks-------无状态管理-------- 低延迟-----------高吞吐
sparkstreaming-----Exactly-once-------RDD Checkpoint-----基于DStream-----中等延迟-----高吞吐
Flink-----------------Exactly-once------ Checkpoint-----------基于操作----------低延迟-------高吞吐

2. Flink DataStream API基本算子

map:做一些清洗转换;
flatMap:输入一个元素,返回一个或者多个元素;
filter:符合条件的数据留下;
keyBy:key相同的数据进入同一个分区;
reduce:当前元素与上一次reduce返回值进行聚合操作;
Union:合并多个流,但是所有的流类型必须一致
Connect:合并两个流,两个流的类型可以不同
CoMapCoFlatMap:对于ConnectedStream使用这俩函数,对两个流进行不同的处理;
split:根据规则吧一个数据流切分为多个流;
Select:配合split,选择切分后的流;

3. Flink On Yarn

Flink on yarn上的好处:
  • 提高集群机器的利用率;
  • 一套集群,可以同时执行MR任务,Spark任务,Flink任务。
Flink on yarn的两种方式:
  • 一开始在Yarn上初始化一个集群;yarn-session.sh【开辟资源】+flink run【提交任务】
  • 每个Flink job都申请一个集群,互不影响,任务执行之后资源会被释放掉。flink run -m yarn-cluster【开辟资源+提交任务】 

4. FlinkKafkaConnector(FlinkKafkaConsumer+FlinkKafkaProducer)

阅读过FlinkKafkaConnector源码后对Kafka偏移量的存储机制有了一个全新的认识,这里有两个坑,简单记下:

  • FlinkKafkaComsumer08用的Kafka老版本Consumer,偏移量提交Zookeeper,由Zookeeper保管,而FlinkKafkaComsumer09以后,偏移量默认存在Kafka内部的Topic中,不再向Zookeeper提交;
  • 使用FlinkKafkaProducer08写Kafka1.10(其他高版本没试过)存在超时

5. DataStream API之partition

  • 随机分区
1 dataStream.shuffle();
  • 重分区,消除数据倾斜
1 dataStream.rebalance();
  • 自定义分区
1 dataStream.partitionCustom(partiitoner,"somekey");
  • 广播分区:把元素广播所有分区,会被重复消费
1 DataStream.broadcast();

6. Flink Distributed Cache(分布式缓存)

  • 原理
类似Hadoop,可以在并行函数比如map中读取本地文件或则HDFS文件,Flink自动将文件复制到所有taskmanager节点的本地文件系统。
  • 用法

用法一:注册一个文件

1 env.registerCachedFile(“hdfs:///path/to/yout/file","hdfsfile");

用法二:访问数据

1 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

7. State

为了实现at least once和exactly once,flink引入了state和checkpoint;state一般将数据保存在 堆内存中,而checkpoint是每隔一段时把state数据 持久化存储了,当失败时可以恢复。

8. Checkpoint

依赖checkpoint机制,只能保证Flink系统内的exactly once。
checkpoint默认是disabled的,开启之后默认是exactly once,这种模式对大多数应用合适,而at least once在某些低延迟的场景中比较合适(例如几毫秒)。例如,
1 env.enableCheckpointing(1000);                                            //每隔1000ms设置一个检查点【检查点周期】             
2 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置模式为exactly once(默认值)                 
3 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//检查点之间至少有500ms的间隔【检查点最小间隔】        
4 env.getCheckpointConfig().setCheckpointTimeout(60000); //检查点必须在一分钟之内完成,否则丢弃【检查点的超时时间】
5 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //同一时间点只允许一个检查点                        
6 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
7 //Flink程序被取消后,会保留检查点数据

9. Flink重启策略

Flink中常用重启策略:
固定时间(Fixed delay);
失败率(Failure rate)【checkpoint机制默认策略】;
无重启(No start)【 默认

10. Window(窗口)

Window将一个stream拆分成有限大小的"桶(buckets)",在这些桶上做计算。Flink中的窗口与可以分为时间驱动【Time Window】和数据驱动【Count Window】,每一个可以细分为:
  • 滚动窗口【没有重叠】tumbling windows
1 .timeWindow(Time.minutes(1));
2 .countWindow(100);
  • 滑动窗口【有重叠】sliding windows
1 .timeWindow(Time.minutes(1),Time.seconds(30))//每隔30秒,统计一分钟的数据
2 .countWindow(100,10)//每隔10条数据,统计100条的数据
  • 会话窗口 session windows
定义了窗口之后,会存在不同的聚合:
  • 增量聚合(窗口中每进入一条数据,就进行一次计算),例如reduce、aggregate、sum、min、max;
  • 全量聚合(等窗口所有数据到齐,才开始计算,可以进行排序等)例如apply、process

当然也可以分为keyed window和non-keyed window,分组的stream调用keyBy(...)和window(...),非分组的stream中window(...)换成了windowAll(...)

11. Time

针对Stream数据流中的时间,可以分为以下三种:
  • Event Time:日志产生的时间;
  • Ingestion Time:事件从kafka等取出来,进入Flink的时间;
  • Processing Time:事件被处理的时间,例如达到窗口处理的时间等。【默认】 

12. Flink并行度

  • Flink中每个TaskManager为集群提供slot,slot数量与每个节点的可用CPU核数成比例,slot上启动进程,进程内有多个线程。如果任务管理器有n个槽,它会为每个槽分配 1/n 的内存,这里没有对 CPU 进行隔离;目前任务槽仅仅用于划分任务的内存

配置一个TaskManager有多少个并发的slot数有两种配置方式:

  • taskmanager.numberOfTaskSlots。在conf/flink-conf.yaml中更改,默认值为1,表示默认一个TaskManager只有1个task slot.
  • 提交作业时通过参数配置。--yarnslots 1,表示TaskManager的slot数为1.

⚠️注意:slot不能搞太多,几十个就行,你想啊假如你机器不多,TaskManager不多,搞那么多slot,每个slot分到的内存小的可怜,容易OOM啊

  • 并行度的设置有多个地方:操作算子层面、执行环境层面、客户端层面、系统层面,具体可以参考:FLINK并行度
算子层面具体情况具体分析,例如KafkaSource的并行度最好跟Kafka Topic的分区数成比例,比如:
1 val wordCounts = text
2    .flatMap{ _.split(" ") map { (_, 1) } }
3    .keyBy(0)
4    .timeWindow(Time.seconds(5))
5    .sum(1).setParallelism(5)

执行环境层面比如:

1 env.setParallelism(5)

提交任务的时候,在客户端侧flink可以通过-p参数来设置并行度。例如:

1 ./bin/flink run -p 5 ../examples/*WordCount-java*.jar

13. TaskManager数量

TaskManager的数量是在提交作业时根据并行度动态计算。先根据设定的 operator的最大并行度计算,例如,如果作业中operator的最大并行度为10,则 Parallelism/numberOfTaskSlots为向YARN申请的TaskManager数。

14. Flink传递参数给函数

参数可以使用构造函数或withParameters(Configuration)方法传递,参数将会作为函数对象的一部分被序列化并传递到task实例中。
  • 使用构造函数方式

 1 DataSet toFilter = env.fromElements(1, 2, 3);
 2 toFilter.filter(new MyFilter(2));
 3 private static class MyFilter implements FilterFunction {
 4   private final int limit;
 5   public MyFilter(int limit) {
 6     this.limit = limit;
 7   }
 8   @Override
 9   public boolean filter(Integer value) throws Exception {
10     return value > limit;
11   }
12 }
(2)withParameters(Configuration)方式
这个方法将携带一个Configuration对象作为参数,参数将会传递给Rich Function的open方法(关于Rich Function参见:rich function)。Configuration对象是一个Map,存储Key/Value键值对.
 1 DataSet toFilter = env.fromElements(1, 2, 3);
 2 Configuration config = new Configuration();
 3 config.setInteger("limit", 2);
 4 toFilter.filter(new RichFilterFunction() {
 5     private int limit;
 6     @Override
 7     public void open(Configuration parameters) throws Exception {
 8       limit = parameters.getInteger("limit", 0);
 9     }
10     @Override
11     public boolean filter(Integer value) throws Exception {
12       return value > limit;
13     }
14 }).withParameters(config);
(3)使用全局的the ExecutionConfig方式
参数可以被所有的rich function获得
 1 Configuration conf = new Configuration();
 2 conf.setString("mykey","myvalue");
 3 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 4 env.getConfig().setGlobalJobParameters(conf);
 5 public static final class Tokenizer extends RichFlatMapFunction> {
 6     private String mykey;
 7     @Override
 8     public void open(Configuration parameters) throws Exception {
 9       super.open(parameters);
10       ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
11       Configuration globConf = (Configuration) globalParams;
12       mykey = globConf.getString("mykey", null);
13     }
14     // ... more here ...

15. Flink中的Metrics

Flink中有丰富的Metrics指标,当然我们也可以使用它的Reporter自定义,具体参见官网,示例如下:
 1 import org.apache.flink.api.common.accumulators.LongCounter;
 2 import org.apache.flink.api.common.functions.RichMapFunction;
 3 import org.apache.flink.api.java.utils.ParameterTool;
 4 import org.apache.flink.configuration.Configuration;
 5 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 6 import org.apache.flink.metrics.Counter;
 7 import org.apache.flink.metrics.Meter;
 8 import org.apache.flink.metrics.MetricGroup;
 9 
10 
11 public class Map extends RichMapFunction<String, String> {
12   private transient Meter logTotalMeter;
13   private transient Counter logTotalConter;
14   private LongCounter logAcc = new LongCounter();
15 
16   @Override
17   public void open(Configuration parameters) throws Exception {
18     MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
19     logTotalMeter = metricGroup.meter("logTotalMeter",
20         new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
21     logTotalConter = metricGroup.counter("logTotalConter");
22     getRuntimeContext().addAccumulator("logAcc", this.logAcc);
23   }
24 
25 
26   @Override
27   public String map(String log) throws Exception {
28     logTotalConter.inc();
29     logTotalMeter.markEvent();
30     logAcc.add(NumberUtil.LONG_ONE);
31     logMeter.mark();
32     return log;
33   }
34 }

16. Flink中的table API

听了阿里军长的讲解后有所了解,简单做下笔记,以免忘记。table API和SQL有很多相同的优良特性,比如:
声明式-用户只关心做什么,不用关心怎么做;
高性能-支持查询优化,可以获取更好的执行性能;
流批统一-相同的统计逻辑,既可以流模式也可以批模式;
标准稳定-语义遵循SQL标准,不易变动
易理解-所见即所得
比如:
1 // table API
2 tab.groupBy("word").select("word,count(1) as cnt")
3 
4 // SQL
5 SELECT COUNT(1) AS cnt
6 FROM tab
7 GROUP BY word

总的来说,SQL有的功能table API都有,如下图所示

 

 环境配置如下

三种注册表的方式

三种发射表的方式

table API对列操作比较方便,比如

再比如有一张100列的表,选择1到10列怎么操作?

另外,table API的map函数扩展起来也很方便

 也可参考:Table API & SQLFlink SQL-Client

17. Flink中的内存管理

写这个也是有点心累的,搞了好久,也算对Flink Web UI上的几个内存指标大致了解了,老规矩记录下。所有程序都是绝对受人控制的,在提交任务那一刻,我们可以指定程序中使用的内存,例如:
1 ./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

红色参数的具体含义是(参见Flink官网):

-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)

 先上两个小图(这俩图我没找到原版出处,一直觉得这玩意坑爹,大概率上这里的JVM Heap包括了on-heap和off-heap,on-heap和off-heap下边有介绍)

 看图说话,TaskManager 的堆内存主要被分成了三个部分:

  • Network Buffers: 一定数量的32KB大小的 buffer,主要用于数据的网络传输。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,可以通过 taskmanager.network.numberOfBuffers来配置。
  • Memory Manager Pool: 这是一个由 MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占了堆内存的 70% 的大小。
  • Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。

⚠️注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是可以给用户代码使用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。

从堆的角度来说,Flink当前的内存支持堆内(on-heap)和堆外(off-heap)管理,用户想去申请什么类型的内存,有相关的参数去配置。Flink off-heap的内存管理相对于on-heap的优点主要在于:

  • 启动分配了大内存(例如100G)的JVM很耗费时间,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都会很小,垃圾回收也不用考虑MemorySegment中的Java对象了,节省了GC时间;
  • 有效防止OOM,MemorySegment大小固定,操作高效。如果MemorySegment不足写到磁盘,内存中的数据不多,一般不会发生OOM;
  • 更有效率的IO操作。在off-heap下,将MemorySegment写到磁盘或是网络可以支持zeor-copy技术,而on-heap的话则至少需要一次内存拷贝;
  • off-heap上的数据可以和其他程序共享。

好了概念讲了一通,看下任务TaskManager日志

 1 2019-07-23 07:27:50,035 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Starting YARN TaskExecutor runner (Version: 1.7.1, Rev:<unknown>, Date:<unknown>)
 2 2019-07-23 07:27:50,035 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  OS current user: yarn
 3 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Current Hadoop/Kerberos user: worker
 4 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.112-b15
 5 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Maximum heap size: 345 MiBytes
 6 2019-07-23 07:27:50,437 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JAVA_HOME: /usr/local/jdk/
 7 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Hadoop version: 2.6.0-cdh5.5.0
 8 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM Options:
 9 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xms360m
10 2019-07-23 07:27:50,439 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xmx360m
11 2019-07-23 07:27:50,439 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -XX:MaxDirectMemorySize=664m

红色的地方有几个值,Flink源码中这几个值的计算在TaskManagerServices.calculateHeapSizeMB(计算堆内内存大小)和calculateNetworkBufferMemory(计算堆外内存大小),下边我们来看默认情况下这些值怎么计算的(仅限Flink 1.7.1版本,其他版本系数可能有变化)。

1. JVM预留内存,总内存的25%,最小预留,600M:

1024MB - (1024 * 0.25 < 600MB) -> 600MB = 424MB (cutoff)

2. 剩下的内存的10%作为networkBuffer的内存,最小64M:

424MB - (424MB * 0.1 < 64MB) -> 64MB(networkbuffer) = 360MB 

3. 批作业的话剩下内存30%设为堆内内存,总内存减去堆内内存设为directMemory,流作业全部都是堆内内存了,用于netty和rocksDB和networkBuffer以及JVM自身内存。

这样360M和664M都明了了,那为啥一开始日志打印“Maximum heap size: 345 MiBytes”呢,这里还有一个小坑,先看Flink怎么计算的,

 1     /**
 2      * The maximum JVM heap size, in bytes.
 3      * 
 4      * <p>This method uses the <i>-Xmx</i> value of the JVM, if set. If not set, it returns (as
 5      * a heuristic) 1/4th of the physical memory size.
 6      * 
 7      * @return The maximum JVM heap size, in bytes.
 8      */
 9     public static long getMaxJvmHeapMemory() {
10         final long maxMemory = Runtime.getRuntime().maxMemory();
11         if (maxMemory != Long.MAX_VALUE) {
12             // we have the proper max memory
13             return maxMemory;
14         } else {
15             // max JVM heap size is not set - use the heuristic to use 1/4th of the physical memory
16             final long physicalMemory = Hardware.getSizeOfPhysicalMemory();
17             if (physicalMemory != -1) {
18                 // got proper value for physical memory
19                 return physicalMemory / 4;
20             } else {
21                 throw new RuntimeException("Could not determine the amount of free memory.\n" +
22                         "Please set the maximum memory for the JVM, e.g. -Xmx512M for 512 megabytes.");
23             }
24         }
25     }

它用了Java lang的方法,这就有问题了,问题在于JVM使用的GC算法都会有一些内存丢失,比如Survivor有两个,但只有1个会用到,另一个一直闲置,总有一块Survivor区是不被计算到可用内存中的。

到底是不是呢,我在Flink程序的Map函数中加了这么一段代码:

1 logger.info("Runtime max: " + mb(Runtime.getRuntime().maxMemory()));
2 MemoryMXBean m = ManagementFactory.getMemoryMXBean();
3 
4 logger.info("Non-heap: " + mb(m.getNonHeapMemoryUsage().getMax()));
5 logger.info("Heap: " + mb(m.getHeapMemoryUsage().getMax()));
6 
7 for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
8   logger.info("Pool: " + mp.getName() + " (type " + mp.getType() + ")" + " = " + mb(mp.getUsage().getMax()));
9 }

打印日志出来

 1 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Heap: 361758720 (345.00 M)
 2 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M)
 3 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Code Cache (type Non-heap memory) = 251658240 (240.00 M)
 4 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M)
 5 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Metaspace (type Non-heap memory) = -1 (-0.00 M)
 6 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Compressed Class Space (type Non-heap memory) = 1073741824 (1024.00 M)
 7 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Eden Space (type Heap memory) = 94371840 (90.00 M)
 8 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M)
 9 2019-07-20 09:03:16,345 INFO  data.demo.core.example                             - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M)
10 2019-07-20 09:03:16,345 INFO  data.demo.core.example                             - Runtime max: 361758720 (345.00 M)

眼见为实,实锤了。

再看Flink TaskManager WebUI

有一些概念还要说下,比如JVM管理两种类型的内存:堆(heap)和非堆(Nonheap),堆就是Java代码可及的内存,所有类实例和数组的内存都是在堆上分配。非堆就是JVM留给自己用的,方法区、栈、每个类结构(如运行时常数池、字段和方法数据)以及方法和构造方法的代码都在非堆内存中。

再比如Direct和Mapped,这俩是JVM缓冲池,主要是JNI使用。

 UI上的这些参数我们都可以通过Flink的Rest API拿到下面这些指标值,比如通过访问本地localhost:23799/taskmanagers/container_e37_1563420494990_0370_01_000009(具体参见Flink官网):

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.Memory Memory.Heap.Used 当前使用的堆内存大小.
Heap.Committed 保证JVM可用的堆内存大小.
Heap.Max 可用于内存管理的堆内存最大值.
NonHeap.Used 当前使用的非堆内存大小.
NonHeap.Committed 保证JVM可用的非堆内存大小.
NonHeap.Max 可用于内存管理的非堆内存最大值.
Direct.Count 直接缓冲池中的缓冲区数量.
Direct.MemoryUsed JVM中用于直接缓冲池的内存大小.
Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量.
Mapped.Count 映射缓冲池中缓冲区的数量.
Mapped.MemoryUsed JVM中用于映射缓冲池的内存大小.
Mapped.TotalCapacity 映射缓冲池中缓冲区的数量.

 

18. Flink中依赖配置

    每个Flink应用程序都依赖一组Flink库,至少应用程序依赖于Flink API。许多应用程序还依赖于某些连接器库(如Kafka,Cassandra等)。所以阅读了下 官网依赖配置这一节的内容,简单记一下,Flink中有两大类依赖项和库:
  • Flink核心依赖:Flink本身由运行系统所需的一组类和依赖项组成,例如协调,网络,检查点,故障转移,API,操作(如窗口),资源管理等。所有这些这些类和依赖项构成了Flink运行时的核心,在启动Flink应用程序时必须存在。这些核心类和依赖项打包在flink-dist.jar中。它们是Flink lib文件夹的一部分。这部分不包含任何连接器或库(CEP,SQL,ML等),以避免默认情况下在类路径中具有过多的依赖项和类,保持默认的类路径较小并避免依赖性冲突。Maven(和其他构建工具)将依赖项打包时一般将核心依赖设为provided,如果它们未设置为provided,可能使生成的JAR包过大,还可能出现添加到应用程序的jar文件的Flink核心依赖项与用户自己的一些依赖版本冲突(可以通过反向类加载来避免);如果在IntelliJ IDEA中调试,则将scope设置为comiple,否则失败报NoClassDefFountError错;
  • 用户应用程序依赖: connectors, formats, or libraries(CEP, SQL, ML),用户应用程序通常打包到应用程序jar中。

     另外,当Flink程序读写HDFS时需要添加Hadoop依赖,不要把Hadoop依赖直接添加到Flink application,而是: export HADOOP_CLASSPATH=`hadoop classpath`,Flink组件启动时会使用该环境变量,这样做是因为:

  • 一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前,例如为检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证或在YARN上部署。
  • Flink的反向类加载方法隐藏了核心依赖关系中的许多传递依赖关系,应用程序可以使用相同依赖项的不同版本,而不会遇到依赖项冲突。

19. Flink单元测试指南

    参考: 指南文档翻译

20. Flink读HDFS

    简单点说,读数据
1 DataSet<String> hdfslines=env.readTextFile("your hdfs path")

写数据

1 hdfslines.writeAsText("your hdfs path")

以上会根据你的默认的线程数来生成多少个分区文件,如果你想最后生成一个文件的话,可以在后面使用setParallelism(1),这样最后就只会生成一个文件了。具体可以这么整

 1    try {
 2       String topic = args[0];
 3       String path = args[1];
 4       //读取配置文件
 5       Configuration conf = new Configuration();
 6       //获取文件系统
 7       FileSystem fs = FileSystem.get(URI.create("/"), conf);
 8 
 9       System.out.println(fs.getUri());
10       if (!fs.getUri().toString().contains("hdfs")) {
11         path = "hdfs://localhost:8020" + path;
12       }
13 
14       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
15       DataSet<String> union = env.readTextFile(fs.getUri() + path + "/").setParallelism(20);
16 
17       union.rebalance().setParallelism(15).map(new MapFunction<String, String>() {
18         private static final long serialVersionUID = 1033071381217373267L;
19 
20         @Override
21         public String map(String rawLog) throws Exception {
22           return rawLog;
23         }
24       }).output(new DiscardingOutputFormat<>())
25           .setParallelism(20);
26       try {
27         env.execute();
28       } catch (Exception e) {
29         e.printStackTrace();
30       }
31 
32       //关闭文件系统
33       fs.close();
34     } catch (Exception e) {
35       logger.error("task submit process error, due to {}.", e.getMessage());
36       e.printStackTrace();
37     }
View Code

 path是传入的目录,比如/user/rawlog/hourly/2019-09-15/09

 

 

 

 

配置

 

21. 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM