如何在flink中傳遞參數


眾所周知,flink作為流計算引擎,處理源源不斷的數據是其本意,但是在處理數據的過程中,往往可能需要一些參數的傳遞,那么有哪些方法進行參數的傳遞?在什么時候使用?這里嘗試進行簡單的總結。

  • 使用configuration

  在main函數中定義變量

1 // Class in Flink to store parameters
2 Configuration configuration = new Configuration();
3 configuration.setString("genre", "Action");
4 
5 lines.filter(new FilterGenreWithParameters())
6         // Pass parameters to a function
7         .withParameters(configuration)
8         .print();

  使用參數的function需要繼承自一個rich的function,這樣才可以在open方法中獲取相應的參數。

 1 class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {
 2 
 3     String genre;
 4 
 5     @Override
 6     public void open(Configuration parameters) throws Exception {
 7         // Read the parameter
 8         genre = parameters.getString("genre", "");
 9     }
10 
11     @Override
12     public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
13         String[] genres = movie.f2.split("\\|");
14 
15         return Stream.of(genres).anyMatch(g -> g.equals(genre));
16     }
17 }

 

  • 使用ParameterTool

使用configuration雖然傳遞了參數,但顯然不夠動態,每次參數改變,都涉及到程序的變更,既然main函數能夠接受參數,flink自然也提供了相應的承接的機制,即ParameterTool。

如果使用ParameterTool,則在參數傳遞上如下

 1 public static void main(String... args) {
 2     // Read command line arguments
 3     ParameterTool parameterTool = ParameterTool.fromArgs(args);
 4     
 5 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 6 env.getConfig().setGlobalJobParameters(parameterTool);
 7 ...
 8 
 9 // This function will be able to read these global parameters
10 lines.filter(new FilterGenreWithGlobalEnv())
11                 .print();
12 }

如上面代碼,使用parameterTool來承接main函數的參數,通過env來設置全局變量來進行分發,那么在繼承了rich函數的邏輯中就可以使用這個全局參數。

 1 class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {
 2 
 3     @Override
 4     public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
 5         String[] genres = movie.f2.split("\\|");
 6         // Get global parameters
 7         ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 8         // Read parameter
 9         String genre = parameterTool.get("genre");
10 
11         return Stream.of(genres).anyMatch(g -> g.equals(genre));
12     }
13 }

 

  • 使用broadcast變量

在上面使用configuration和parametertool進行參數傳遞會很方便,但是也僅僅適用於少量參數的傳遞,如果有比較大量的數據傳遞,flink則提供了另外的方式來進行,其中之一即是broadcast,這個也是在其他計算引擎中廣泛使用的方法之一。

 1 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
 2 // Get a dataset with words to ignore
 3 DataSet<String> wordsToIgnore = ...
 4 
 5 data.map(new RichFlatMapFunction<String, String>() {
 6 
 7     // A collection to store words. This will be stored in memory
 8     // of a task manager
 9     Collection<String> wordsToIgnore;
10 
11     @Override
12     public void open(Configuration parameters) throws Exception {
13         // Read a collection of words to ignore
14         wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
15     }
16 
17 
18     @Override
19     public String map(String line, Collector<String> out) throws Exception {
20         String[] words = line.split("\\W+");
21         for (String word : words)
22             // Use the collection of words to ignore
23             if (wordsToIgnore.contains(word))
24                 out.collect(new Tuple2<>(word, 1));
25     }
26     // Pass a dataset via a broadcast variable
27 }).withBroadcastSet(wordsToIgnore, "wordsToIgnore");

在第3行定義了需要進行廣播的數據集,在第27行指定了將此數據集進行廣播的目的地。

廣播的變量會保存在tm的內存中,這個也必然會使用tm有限的內存空間,也因此不能廣播太大量的數據。

那么,對於數據量更大的廣播需要,要如何進行?flink也提供了緩存文件的機制,如下。

  • 使用distributedCache

首先還是需要在定義dag圖的時候指定緩存文件:

1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2 
3 // Register a file from HDFS
4 env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")
5 
6 ...
7 
8 env.execute()

flink本身支持指定本地的緩存文件,但一般而言,建議指定分布式存儲比如hdfs上的文件,並為其指定一個名稱。

使用起來也很簡單,在rich函數的open方法中進行獲取。

 1 class MyClassifier extends RichMapFunction<String, Integer> {
 2 
 3     @Override
 4     public void open(Configuration config) {
 5       File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
 6       ...
 7     }
 8 
 9     @Override
10     public Integer map(String value) throws Exception {
11       ...
12     }
13 }

上面的代碼忽略了對文件內容的處理。

在上面的幾個方法中,應該說參數本身都是static的,不會變化,那么如果參數本身隨着時間也會發生變化,怎么辦?

嗯,那就用connectStream,其實也是流的聚合了。

  • 使用connectStream

使用ConnectedStream的前提當然是需要有一個動態的流,比如在主數據之外,還有一些規則數據,這些規則數據會通過Restful服務來發布,假如我們的主數據來自於kafka,

那么,就可以如下:

1 DataStreamSource<String> input = (DataStreamSource) KafkaStreamFactory
2                 .getKafka08Stream(env, srcCluster, srcTopic, srcGroup);
3 
4 DataStream<Tuple2<String, String>> appkeyMeta = env.addSource(new AppKeySourceFunction(), "appkey")
5 
6 ConnectedStreams<String, Tuple2<String, String>> connectedStreams = input.connect(appkeyMeta.broadcast());
7 
8 DataStream<String> cleanData = connectedStreams.flatMap(new DataCleanFlatMapFunction())

其實可以看到,上面的代碼中做了四件事,首先在第1行定義了獲取主數據的流,在第4行定義了獲取規則數據的流,在AppKeySourceFunction中實現了讀取Restful的邏輯,

在第6行實現了將規則數據廣播到主數據中去,最后在第8行實現了從connectedStream中得到經過處理的數據。其中的關鍵即在於DataCleanFlatMapFunction。

1 public class DataCleanFlatMapFunction extends RichCoFlatMapFunction<String, Tuple2<String, String>, String>{
2 
3 public void flatMap1(String s, Collector<String> collector){...}
4 
5 public void flatMap2(Tuple2<String, String> s, Collector<String> collector) {...}
6 
7 
8 }

這是一段縮減的代碼,關鍵在於第一行,首先這個函數需要實現RichCoFlatMapFunction這個抽象類,其次在類實現中,flatMap2會承接規則函數,flatMap1會承接主函數。

 

 

當然,參數可以從client發送到task,有時候也需要從task發回到client,一般這里就會使用accumulator。

這里先看一個簡單的例子,實現單詞的計數以及處理文本的記錄數:

 1 DataSet<String> lines = ...
 2 
 3 // Word count algorithm
 4 lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
 5     @Override
 6     public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
 7         String[] words = line.split("\\W+");
 8         for (String word : words) {
 9             out.collect(new Tuple2<>(word, 1));
10         }
11     }
12 })
13 .groupBy(0)
14 .sum(1)
15 .print();
16 
17 // Count a number of lines in the text to process
18 int linesCount = lines.count()
19 System.out.println(linesCount);

上面的代碼中,第14行實現了單詞的計算,第18行實現了處理記錄的行數,但很可惜,這里會產生兩個job,僅僅第18行一句代碼,就會產生一個job,無疑是不高效的。

flink提供了accumulator來實現數據的回傳,亦即從tm傳回到JM。

flink本身提供了一些內置的accumulator:

  • IntCounterLongCounterDoubleCounter – allows summing together int, long, double values sent from task managers
  • AverageAccumulator – calculates an average of double values
  • LongMaximumLongMinimumIntMaximumIntMinimumDoubleMaximumDoubleMinimum – accumulators to determine maximum and minimum values for different types
  • Histogram – used to computed distribution of values from task managers

首先需要定義一個accumulator,然后在某個自定義函數中來注冊它,這樣在客戶端就可以獲取相應的的值。

 1 lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
 2 
 3     // Create an accumulator
 4     private IntCounter linesNum = new IntCounter();
 5 
 6     @Override
 7     public void open(Configuration parameters) throws Exception {
 8         // Register accumulator
 9         getRuntimeContext().addAccumulator("linesNum", linesNum);
10     }
11 
12     @Override
13     public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
14         String[] words = line.split("\\W+");
15         for (String word : words) {
16             out.collect(new Tuple2<>(word, 1));
17         }
18         
19         // Increment after each line is processed
20         linesNum.add(1);
21     }
22 })
23 .groupBy(0)
24 .sum(1)
25 .print();
26 
27 // Get accumulator result
28 int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
29 System.out.println(linesNum);

當然,如果內置的accumulator不能滿足需求,可以自定義accumulator,只需要繼承兩個接口之一即可,Accumulator或者SimpleAccumulato。

 

上面介紹了幾種參數傳遞的方式,在日常的使用中,可能不僅僅是使用其中一種,或許是某些的組合,比如通過parametertool來傳遞hdfs的路徑,再通過filecache來讀取緩存。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM