Flink知識點


1. Flink、Storm、Sparkstreaming對比

Storm只支持流處理任務,數據是一條一條的源源不斷地處理,而MapReduce、spark只支持批處理任務,spark-streaming本質上是一個批處理,采用micro-batch的方式,將數據流切分成細粒度的batch進行處理。Flink同時支持流處理和批處理,一條數據被處理完以后,序列化到緩存后,以固定的緩存塊為單位進行網絡數據傳輸,緩存塊設為0為流處理,緩存塊設為較大值為批處理。
storm------ ---------At-least-once----Record Acks-------無狀態管理-------- 低延遲-----------高吞吐
sparkstreaming-----Exactly-once-------RDD Checkpoint-----基於DStream-----中等延遲-----高吞吐
Flink-----------------Exactly-once------ Checkpoint-----------基於操作----------低延遲-------高吞吐

2. Flink DataStream API基本算子

map:做一些清洗轉換;
flatMap:輸入一個元素,返回一個或者多個元素;
filter:符合條件的數據留下;
keyBy:key相同的數據進入同一個分區;
reduce:當前元素與上一次reduce返回值進行聚合操作;
Union:合並多個流,但是所有的流類型必須一致
Connect:合並兩個流,兩個流的類型可以不同
CoMapCoFlatMap:對於ConnectedStream使用這倆函數,對兩個流進行不同的處理;
split:根據規則吧一個數據流切分為多個流;
Select:配合split,選擇切分后的流;

3. Flink On Yarn

Flink on yarn上的好處:
  • 提高集群機器的利用率;
  • 一套集群,可以同時執行MR任務,Spark任務,Flink任務。
Flink on yarn的兩種方式:
  • 一開始在Yarn上初始化一個集群;yarn-session.sh【開辟資源】+flink run【提交任務】
  • 每個Flink job都申請一個集群,互不影響,任務執行之后資源會被釋放掉。flink run -m yarn-cluster【開辟資源+提交任務】 

4. FlinkKafkaConnector(FlinkKafkaConsumer+FlinkKafkaProducer)

閱讀過FlinkKafkaConnector源碼后對Kafka偏移量的存儲機制有了一個全新的認識,這里有兩個坑,簡單記下:

  • FlinkKafkaComsumer08用的Kafka老版本Consumer,偏移量提交Zookeeper,由Zookeeper保管,而FlinkKafkaComsumer09以后,偏移量默認存在Kafka內部的Topic中,不再向Zookeeper提交;
  • 使用FlinkKafkaProducer08寫Kafka1.10(其他高版本沒試過)存在超時

5. DataStream API之partition

  • 隨機分區
1 dataStream.shuffle();
  • 重分區,消除數據傾斜
1 dataStream.rebalance();
  • 自定義分區
1 dataStream.partitionCustom(partiitoner,"somekey");
  • 廣播分區:把元素廣播所有分區,會被重復消費
1 DataStream.broadcast();

6. Flink Distributed Cache(分布式緩存)

  • 原理
類似Hadoop,可以在並行函數比如map中讀取本地文件或則HDFS文件,Flink自動將文件復制到所有taskmanager節點的本地文件系統。
  • 用法

用法一:注冊一個文件

1 env.registerCachedFile(“hdfs:///path/to/yout/file","hdfsfile");

用法二:訪問數據

1 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

7. State

為了實現at least once和exactly once,flink引入了state和checkpoint;state一般將數據保存在 堆內存中,而checkpoint是每隔一段時把state數據 持久化存儲了,當失敗時可以恢復。

8. Checkpoint

依賴checkpoint機制,只能保證Flink系統內的exactly once。
checkpoint默認是disabled的,開啟之后默認是exactly once,這種模式對大多數應用合適,而at least once在某些低延遲的場景中比較合適(例如幾毫秒)。例如,
1 env.enableCheckpointing(1000);                                            //每隔1000ms設置一個檢查點【檢查點周期】             
2 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置模式為exactly once(默認值)                 
3 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//檢查點之間至少有500ms的間隔【檢查點最小間隔】        
4 env.getCheckpointConfig().setCheckpointTimeout(60000); //檢查點必須在一分鍾之內完成,否則丟棄【檢查點的超時時間】
5 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //同一時間點只允許一個檢查點                        
6 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
7 //Flink程序被取消后,會保留檢查點數據

9. Flink重啟策略

Flink中常用重啟策略:
固定時間(Fixed delay);
失敗率(Failure rate)【checkpoint機制默認策略】;
無重啟(No start)【 默認

10. Window(窗口)

Window將一個stream拆分成有限大小的"桶(buckets)",在這些桶上做計算。Flink中的窗口與可以分為時間驅動【Time Window】和數據驅動【Count Window】,每一個可以細分為:
  • 滾動窗口【沒有重疊】tumbling windows
1 .timeWindow(Time.minutes(1));
2 .countWindow(100);
  • 滑動窗口【有重疊】sliding windows
1 .timeWindow(Time.minutes(1),Time.seconds(30))//每隔30秒,統計一分鍾的數據
2 .countWindow(100,10)//每隔10條數據,統計100條的數據
  • 會話窗口 session windows
定義了窗口之后,會存在不同的聚合:
  • 增量聚合(窗口中每進入一條數據,就進行一次計算),例如reduce、aggregate、sum、min、max;
  • 全量聚合(等窗口所有數據到齊,才開始計算,可以進行排序等)例如apply、process

當然也可以分為keyed window和non-keyed window,分組的stream調用keyBy(...)和window(...),非分組的stream中window(...)換成了windowAll(...)

11. Time

針對Stream數據流中的時間,可以分為以下三種:
  • Event Time:日志產生的時間;
  • Ingestion Time:事件從kafka等取出來,進入Flink的時間;
  • Processing Time:事件被處理的時間,例如達到窗口處理的時間等。【默認】 

12. Flink並行度

  • Flink中每個TaskManager為集群提供slot,slot數量與每個節點的可用CPU核數成比例,slot上啟動進程,進程內有多個線程。如果任務管理器有n個槽,它會為每個槽分配 1/n 的內存,這里沒有對 CPU 進行隔離;目前任務槽僅僅用於划分任務的內存

配置一個TaskManager有多少個並發的slot數有兩種配置方式:

  • taskmanager.numberOfTaskSlots。在conf/flink-conf.yaml中更改,默認值為1,表示默認一個TaskManager只有1個task slot.
  • 提交作業時通過參數配置。--yarnslots 1,表示TaskManager的slot數為1.

⚠️注意:slot不能搞太多,幾十個就行,你想啊假如你機器不多,TaskManager不多,搞那么多slot,每個slot分到的內存小的可憐,容易OOM啊

  • 並行度的設置有多個地方:操作算子層面、執行環境層面、客戶端層面、系統層面,具體可以參考:FLINK並行度
算子層面具體情況具體分析,例如KafkaSource的並行度最好跟Kafka Topic的分區數成比例,比如:
1 val wordCounts = text
2    .flatMap{ _.split(" ") map { (_, 1) } }
3    .keyBy(0)
4    .timeWindow(Time.seconds(5))
5    .sum(1).setParallelism(5)

執行環境層面比如:

1 env.setParallelism(5)

提交任務的時候,在客戶端側flink可以通過-p參數來設置並行度。例如:

1 ./bin/flink run -p 5 ../examples/*WordCount-java*.jar

13. TaskManager數量

TaskManager的數量是在提交作業時根據並行度動態計算。先根據設定的 operator的最大並行度計算,例如,如果作業中operator的最大並行度為10,則 Parallelism/numberOfTaskSlots為向YARN申請的TaskManager數。

14. Flink傳遞參數給函數

參數可以使用構造函數或withParameters(Configuration)方法傳遞,參數將會作為函數對象的一部分被序列化並傳遞到task實例中。
  • 使用構造函數方式

 1 DataSet toFilter = env.fromElements(1, 2, 3);
 2 toFilter.filter(new MyFilter(2));
 3 private static class MyFilter implements FilterFunction {
 4   private final int limit;
 5   public MyFilter(int limit) {
 6     this.limit = limit;
 7   }
 8   @Override
 9   public boolean filter(Integer value) throws Exception {
10     return value > limit;
11   }
12 }
(2)withParameters(Configuration)方式
這個方法將攜帶一個Configuration對象作為參數,參數將會傳遞給Rich Function的open方法(關於Rich Function參見:rich function)。Configuration對象是一個Map,存儲Key/Value鍵值對.
 1 DataSet toFilter = env.fromElements(1, 2, 3);
 2 Configuration config = new Configuration();
 3 config.setInteger("limit", 2);
 4 toFilter.filter(new RichFilterFunction() {
 5     private int limit;
 6     @Override
 7     public void open(Configuration parameters) throws Exception {
 8       limit = parameters.getInteger("limit", 0);
 9     }
10     @Override
11     public boolean filter(Integer value) throws Exception {
12       return value > limit;
13     }
14 }).withParameters(config);
(3)使用全局的the ExecutionConfig方式
參數可以被所有的rich function獲得
 1 Configuration conf = new Configuration();
 2 conf.setString("mykey","myvalue");
 3 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 4 env.getConfig().setGlobalJobParameters(conf);
 5 public static final class Tokenizer extends RichFlatMapFunction> {
 6     private String mykey;
 7     @Override
 8     public void open(Configuration parameters) throws Exception {
 9       super.open(parameters);
10       ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
11       Configuration globConf = (Configuration) globalParams;
12       mykey = globConf.getString("mykey", null);
13     }
14     // ... more here ...

15. Flink中的Metrics

Flink中有豐富的Metrics指標,當然我們也可以使用它的Reporter自定義,具體參見官網,示例如下:
 1 import org.apache.flink.api.common.accumulators.LongCounter;
 2 import org.apache.flink.api.common.functions.RichMapFunction;
 3 import org.apache.flink.api.java.utils.ParameterTool;
 4 import org.apache.flink.configuration.Configuration;
 5 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 6 import org.apache.flink.metrics.Counter;
 7 import org.apache.flink.metrics.Meter;
 8 import org.apache.flink.metrics.MetricGroup;
 9 
10 
11 public class Map extends RichMapFunction<String, String> {
12   private transient Meter logTotalMeter;
13   private transient Counter logTotalConter;
14   private LongCounter logAcc = new LongCounter();
15 
16   @Override
17   public void open(Configuration parameters) throws Exception {
18     MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
19     logTotalMeter = metricGroup.meter("logTotalMeter",
20         new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
21     logTotalConter = metricGroup.counter("logTotalConter");
22     getRuntimeContext().addAccumulator("logAcc", this.logAcc);
23   }
24 
25 
26   @Override
27   public String map(String log) throws Exception {
28     logTotalConter.inc();
29     logTotalMeter.markEvent();
30     logAcc.add(NumberUtil.LONG_ONE);
31     logMeter.mark();
32     return log;
33   }
34 }

16. Flink中的table API

聽了阿里軍長的講解后有所了解,簡單做下筆記,以免忘記。table API和SQL有很多相同的優良特性,比如:
聲明式-用戶只關心做什么,不用關心怎么做;
高性能-支持查詢優化,可以獲取更好的執行性能;
流批統一-相同的統計邏輯,既可以流模式也可以批模式;
標准穩定-語義遵循SQL標准,不易變動
易理解-所見即所得
比如:
1 // table API
2 tab.groupBy("word").select("word,count(1) as cnt")
3 
4 // SQL
5 SELECT COUNT(1) AS cnt
6 FROM tab
7 GROUP BY word

總的來說,SQL有的功能table API都有,如下圖所示

 

 環境配置如下

三種注冊表的方式

三種發射表的方式

table API對列操作比較方便,比如

再比如有一張100列的表,選擇1到10列怎么操作?

另外,table API的map函數擴展起來也很方便

 也可參考:Table API & SQLFlink SQL-Client

17. Flink中的內存管理

寫這個也是有點心累的,搞了好久,也算對Flink Web UI上的幾個內存指標大致了解了,老規矩記錄下。所有程序都是絕對受人控制的,在提交任務那一刻,我們可以指定程序中使用的內存,例如:
1 ./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

紅色參數的具體含義是(參見Flink官網):

-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)

 先上兩個小圖(這倆圖我沒找到原版出處,一直覺得這玩意坑爹,大概率上這里的JVM Heap包括了on-heap和off-heap,on-heap和off-heap下邊有介紹)

 看圖說話,TaskManager 的堆內存主要被分成了三個部分:

  • Network Buffers: 一定數量的32KB大小的 buffer,主要用於數據的網絡傳輸。在 TaskManager 啟動的時候就會分配。默認數量是 2048 個,可以通過 taskmanager.network.numberOfBuffers來配置。
  • Memory Manager Pool: 這是一個由 MemoryManager 管理的,由眾多MemorySegment組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化后的數據存於其中,使用完后釋放回內存池。默認情況下,池子占了堆內存的 70% 的大小。
  • Remaining (Free) Heap: 這部分的內存是留給用戶代碼以及 TaskManager 的數據結構使用的。因為這些數據結構一般都很小,所以基本上這些內存都是給用戶代碼使用的。從GC的角度來看,可以把這里看成的新生代,也就是說這里主要都是由用戶代碼生成的短期對象。

⚠️注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,該池子不會預分配內存,也不會向該池子請求內存塊。也就是說該部分的內存都是可以給用戶代碼使用的。不過社區是打算在 Streaming 模式下也能將該池子利用起來。

從堆的角度來說,Flink當前的內存支持堆內(on-heap)和堆外(off-heap)管理,用戶想去申請什么類型的內存,有相關的參數去配置。Flink off-heap的內存管理相對於on-heap的優點主要在於:

  • 啟動分配了大內存(例如100G)的JVM很耗費時間,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都會很小,垃圾回收也不用考慮MemorySegment中的Java對象了,節省了GC時間;
  • 有效防止OOM,MemorySegment大小固定,操作高效。如果MemorySegment不足寫到磁盤,內存中的數據不多,一般不會發生OOM;
  • 更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網絡可以支持zeor-copy技術,而on-heap的話則至少需要一次內存拷貝;
  • off-heap上的數據可以和其他程序共享。

好了概念講了一通,看下任務TaskManager日志

 1 2019-07-23 07:27:50,035 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Starting YARN TaskExecutor runner (Version: 1.7.1, Rev:<unknown>, Date:<unknown>)
 2 2019-07-23 07:27:50,035 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  OS current user: yarn
 3 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Current Hadoop/Kerberos user: worker
 4 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.112-b15
 5 2019-07-23 07:27:50,436 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Maximum heap size: 345 MiBytes
 6 2019-07-23 07:27:50,437 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JAVA_HOME: /usr/local/jdk/
 7 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Hadoop version: 2.6.0-cdh5.5.0
 8 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM Options:
 9 2019-07-23 07:27:50,438 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xms360m
10 2019-07-23 07:27:50,439 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xmx360m
11 2019-07-23 07:27:50,439 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -XX:MaxDirectMemorySize=664m

紅色的地方有幾個值,Flink源碼中這幾個值的計算在TaskManagerServices.calculateHeapSizeMB(計算堆內內存大小)和calculateNetworkBufferMemory(計算堆外內存大小),下邊我們來看默認情況下這些值怎么計算的(僅限Flink 1.7.1版本,其他版本系數可能有變化)。

1. JVM預留內存,總內存的25%,最小預留,600M:

1024MB - (1024 * 0.25 < 600MB) -> 600MB = 424MB (cutoff)

2. 剩下的內存的10%作為networkBuffer的內存,最小64M:

424MB - (424MB * 0.1 < 64MB) -> 64MB(networkbuffer) = 360MB 

3. 批作業的話剩下內存30%設為堆內內存,總內存減去堆內內存設為directMemory,流作業全部都是堆內內存了,用於netty和rocksDB和networkBuffer以及JVM自身內存。

這樣360M和664M都明了了,那為啥一開始日志打印“Maximum heap size: 345 MiBytes”呢,這里還有一個小坑,先看Flink怎么計算的,

 1     /**
 2      * The maximum JVM heap size, in bytes.
 3      * 
 4      * <p>This method uses the <i>-Xmx</i> value of the JVM, if set. If not set, it returns (as
 5      * a heuristic) 1/4th of the physical memory size.
 6      * 
 7      * @return The maximum JVM heap size, in bytes.
 8      */
 9     public static long getMaxJvmHeapMemory() {
10         final long maxMemory = Runtime.getRuntime().maxMemory();
11         if (maxMemory != Long.MAX_VALUE) {
12             // we have the proper max memory
13             return maxMemory;
14         } else {
15             // max JVM heap size is not set - use the heuristic to use 1/4th of the physical memory
16             final long physicalMemory = Hardware.getSizeOfPhysicalMemory();
17             if (physicalMemory != -1) {
18                 // got proper value for physical memory
19                 return physicalMemory / 4;
20             } else {
21                 throw new RuntimeException("Could not determine the amount of free memory.\n" +
22                         "Please set the maximum memory for the JVM, e.g. -Xmx512M for 512 megabytes.");
23             }
24         }
25     }

它用了Java lang的方法,這就有問題了,問題在於JVM使用的GC算法都會有一些內存丟失,比如Survivor有兩個,但只有1個會用到,另一個一直閑置,總有一塊Survivor區是不被計算到可用內存中的。

到底是不是呢,我在Flink程序的Map函數中加了這么一段代碼:

1 logger.info("Runtime max: " + mb(Runtime.getRuntime().maxMemory()));
2 MemoryMXBean m = ManagementFactory.getMemoryMXBean();
3 
4 logger.info("Non-heap: " + mb(m.getNonHeapMemoryUsage().getMax()));
5 logger.info("Heap: " + mb(m.getHeapMemoryUsage().getMax()));
6 
7 for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
8   logger.info("Pool: " + mp.getName() + " (type " + mp.getType() + ")" + " = " + mb(mp.getUsage().getMax()));
9 }

打印日志出來

 1 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Heap: 361758720 (345.00 M)
 2 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M)
 3 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Code Cache (type Non-heap memory) = 251658240 (240.00 M)
 4 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M)
 5 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Metaspace (type Non-heap memory) = -1 (-0.00 M)
 6 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: Compressed Class Space (type Non-heap memory) = 1073741824 (1024.00 M)
 7 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Eden Space (type Heap memory) = 94371840 (90.00 M)
 8 2019-07-20 09:03:16,344 INFO  data.demo.core.example                             - Pool: PS Survivor Space (type Heap memory) = 15728640 (15.00 M)
 9 2019-07-20 09:03:16,345 INFO  data.demo.core.example                             - Pool: PS Old Gen (type Heap memory) = 251658240 (240.00 M)
10 2019-07-20 09:03:16,345 INFO  data.demo.core.example                             - Runtime max: 361758720 (345.00 M)

眼見為實,實錘了。

再看Flink TaskManager WebUI

有一些概念還要說下,比如JVM管理兩種類型的內存:堆(heap)和非堆(Nonheap),堆就是Java代碼可及的內存,所有類實例和數組的內存都是在堆上分配。非堆就是JVM留給自己用的,方法區、棧、每個類結構(如運行時常數池、字段和方法數據)以及方法和構造方法的代碼都在非堆內存中。

再比如Direct和Mapped,這倆是JVM緩沖池,主要是JNI使用。

 UI上的這些參數我們都可以通過Flink的Rest API拿到下面這些指標值,比如通過訪問本地localhost:23799/taskmanagers/container_e37_1563420494990_0370_01_000009(具體參見Flink官網):

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.Memory Memory.Heap.Used 當前使用的堆內存大小.
Heap.Committed 保證JVM可用的堆內存大小.
Heap.Max 可用於內存管理的堆內存最大值.
NonHeap.Used 當前使用的非堆內存大小.
NonHeap.Committed 保證JVM可用的非堆內存大小.
NonHeap.Max 可用於內存管理的非堆內存最大值.
Direct.Count 直接緩沖池中的緩沖區數量.
Direct.MemoryUsed JVM中用於直接緩沖池的內存大小.
Direct.TotalCapacity 直接緩沖池中所有緩沖區的總容量.
Mapped.Count 映射緩沖池中緩沖區的數量.
Mapped.MemoryUsed JVM中用於映射緩沖池的內存大小.
Mapped.TotalCapacity 映射緩沖池中緩沖區的數量.

 

18. Flink中依賴配置

    每個Flink應用程序都依賴一組Flink庫,至少應用程序依賴於Flink API。許多應用程序還依賴於某些連接器庫(如Kafka,Cassandra等)。所以閱讀了下 官網依賴配置這一節的內容,簡單記一下,Flink中有兩大類依賴項和庫:
  • Flink核心依賴:Flink本身由運行系統所需的一組類和依賴項組成,例如協調,網絡,檢查點,故障轉移,API,操作(如窗口),資源管理等。所有這些這些類和依賴項構成了Flink運行時的核心,在啟動Flink應用程序時必須存在。這些核心類和依賴項打包在flink-dist.jar中。它們是Flink lib文件夾的一部分。這部分不包含任何連接器或庫(CEP,SQL,ML等),以避免默認情況下在類路徑中具有過多的依賴項和類,保持默認的類路徑較小並避免依賴性沖突。Maven(和其他構建工具)將依賴項打包時一般將核心依賴設為provided,如果它們未設置為provided,可能使生成的JAR包過大,還可能出現添加到應用程序的jar文件的Flink核心依賴項與用戶自己的一些依賴版本沖突(可以通過反向類加載來避免);如果在IntelliJ IDEA中調試,則將scope設置為comiple,否則失敗報NoClassDefFountError錯;
  • 用戶應用程序依賴: connectors, formats, or libraries(CEP, SQL, ML),用戶應用程序通常打包到應用程序jar中。

     另外,當Flink程序讀寫HDFS時需要添加Hadoop依賴,不要把Hadoop依賴直接添加到Flink application,而是: export HADOOP_CLASSPATH=`hadoop classpath`,Flink組件啟動時會使用該環境變量,這樣做是因為:

  • 一些Hadoop交互發生在Flink的核心,可能在用戶應用程序啟動之前,例如為檢查點設置HDFS,通過Hadoop的Kerberos令牌進行身份驗證或在YARN上部署。
  • Flink的反向類加載方法隱藏了核心依賴關系中的許多傳遞依賴關系,應用程序可以使用相同依賴項的不同版本,而不會遇到依賴項沖突。

19. Flink單元測試指南

    參考: 指南文檔翻譯

20. Flink讀HDFS

    簡單點說,讀數據
1 DataSet<String> hdfslines=env.readTextFile("your hdfs path")

寫數據

1 hdfslines.writeAsText("your hdfs path")

以上會根據你的默認的線程數來生成多少個分區文件,如果你想最后生成一個文件的話,可以在后面使用setParallelism(1),這樣最后就只會生成一個文件了。具體可以這么整

 1    try {
 2       String topic = args[0];
 3       String path = args[1];
 4       //讀取配置文件
 5       Configuration conf = new Configuration();
 6       //獲取文件系統
 7       FileSystem fs = FileSystem.get(URI.create("/"), conf);
 8 
 9       System.out.println(fs.getUri());
10       if (!fs.getUri().toString().contains("hdfs")) {
11         path = "hdfs://localhost:8020" + path;
12       }
13 
14       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
15       DataSet<String> union = env.readTextFile(fs.getUri() + path + "/").setParallelism(20);
16 
17       union.rebalance().setParallelism(15).map(new MapFunction<String, String>() {
18         private static final long serialVersionUID = 1033071381217373267L;
19 
20         @Override
21         public String map(String rawLog) throws Exception {
22           return rawLog;
23         }
24       }).output(new DiscardingOutputFormat<>())
25           .setParallelism(20);
26       try {
27         env.execute();
28       } catch (Exception e) {
29         e.printStackTrace();
30       }
31 
32       //關閉文件系統
33       fs.close();
34     } catch (Exception e) {
35       logger.error("task submit process error, due to {}.", e.getMessage());
36       e.printStackTrace();
37     }
View Code

 path是傳入的目錄,比如/user/rawlog/hourly/2019-09-15/09

 

 

 

 

配置

 

21. 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM