眾所周知,flink作為流計算引擎,處理源源不斷的數據是其本意,但是在處理數據的過程中,往往可能需要一些參數的傳遞,那么有哪些方法進行參數的傳遞?在什么時候使用?這里嘗試進行簡單的總結。
- 使用configuration
在main函數中定義變量
1 // Class in Flink to store parameters 2 Configuration configuration = new Configuration(); 3 configuration.setString("genre", "Action"); 4 5 lines.filter(new FilterGenreWithParameters()) 6 // Pass parameters to a function 7 .withParameters(configuration) 8 .print();
使用參數的function需要繼承自一個rich的function,這樣才可以在open方法中獲取相應的參數。
1 class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> { 2 3 String genre; 4 5 @Override 6 public void open(Configuration parameters) throws Exception { 7 // Read the parameter 8 genre = parameters.getString("genre", ""); 9 } 10 11 @Override 12 public boolean filter(Tuple3<Long, String, String> movie) throws Exception { 13 String[] genres = movie.f2.split("\\|"); 14 15 return Stream.of(genres).anyMatch(g -> g.equals(genre)); 16 } 17 }
- 使用ParameterTool
使用configuration雖然傳遞了參數,但顯然不夠動態,每次參數改變,都涉及到程序的變更,既然main函數能夠接受參數,flink自然也提供了相應的承接的機制,即ParameterTool。
如果使用ParameterTool,則在參數傳遞上如下
1 public static void main(String... args) { 2 // Read command line arguments 3 ParameterTool parameterTool = ParameterTool.fromArgs(args); 4 5 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 6 env.getConfig().setGlobalJobParameters(parameterTool); 7 ... 8 9 // This function will be able to read these global parameters 10 lines.filter(new FilterGenreWithGlobalEnv()) 11 .print(); 12 }
如上面代碼,使用parameterTool來承接main函數的參數,通過env來設置全局變量來進行分發,那么在繼承了rich函數的邏輯中就可以使用這個全局參數。
1 class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> { 2 3 @Override 4 public boolean filter(Tuple3<Long, String, String> movie) throws Exception { 5 String[] genres = movie.f2.split("\\|"); 6 // Get global parameters 7 ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 8 // Read parameter 9 String genre = parameterTool.get("genre"); 10 11 return Stream.of(genres).anyMatch(g -> g.equals(genre)); 12 } 13 }
- 使用broadcast變量
在上面使用configuration和parametertool進行參數傳遞會很方便,但是也僅僅適用於少量參數的傳遞,如果有比較大量的數據傳遞,flink則提供了另外的方式來進行,其中之一即是broadcast,這個也是在其他計算引擎中廣泛使用的方法之一。
1 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); 2 // Get a dataset with words to ignore 3 DataSet<String> wordsToIgnore = ... 4 5 data.map(new RichFlatMapFunction<String, String>() { 6 7 // A collection to store words. This will be stored in memory 8 // of a task manager 9 Collection<String> wordsToIgnore; 10 11 @Override 12 public void open(Configuration parameters) throws Exception { 13 // Read a collection of words to ignore 14 wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore"); 15 } 16 17 18 @Override 19 public String map(String line, Collector<String> out) throws Exception { 20 String[] words = line.split("\\W+"); 21 for (String word : words) 22 // Use the collection of words to ignore 23 if (wordsToIgnore.contains(word)) 24 out.collect(new Tuple2<>(word, 1)); 25 } 26 // Pass a dataset via a broadcast variable 27 }).withBroadcastSet(wordsToIgnore, "wordsToIgnore");
在第3行定義了需要進行廣播的數據集,在第27行指定了將此數據集進行廣播的目的地。
廣播的變量會保存在tm的內存中,這個也必然會使用tm有限的內存空間,也因此不能廣播太大量的數據。
那么,對於數據量更大的廣播需要,要如何進行?flink也提供了緩存文件的機制,如下。
- 使用distributedCache
首先還是需要在定義dag圖的時候指定緩存文件:
1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2 3 // Register a file from HDFS 4 env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel") 5 6 ... 7 8 env.execute()
flink本身支持指定本地的緩存文件,但一般而言,建議指定分布式存儲比如hdfs上的文件,並為其指定一個名稱。
使用起來也很簡單,在rich函數的open方法中進行獲取。
1 class MyClassifier extends RichMapFunction<String, Integer> { 2 3 @Override 4 public void open(Configuration config) { 5 File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel"); 6 ... 7 } 8 9 @Override 10 public Integer map(String value) throws Exception { 11 ... 12 } 13 }
上面的代碼忽略了對文件內容的處理。
在上面的幾個方法中,應該說參數本身都是static的,不會變化,那么如果參數本身隨着時間也會發生變化,怎么辦?
嗯,那就用connectStream,其實也是流的聚合了。
- 使用connectStream
使用ConnectedStream的前提當然是需要有一個動態的流,比如在主數據之外,還有一些規則數據,這些規則數據會通過Restful服務來發布,假如我們的主數據來自於kafka,
那么,就可以如下:
1 DataStreamSource<String> input = (DataStreamSource) KafkaStreamFactory 2 .getKafka08Stream(env, srcCluster, srcTopic, srcGroup); 3 4 DataStream<Tuple2<String, String>> appkeyMeta = env.addSource(new AppKeySourceFunction(), "appkey") 5 6 ConnectedStreams<String, Tuple2<String, String>> connectedStreams = input.connect(appkeyMeta.broadcast()); 7 8 DataStream<String> cleanData = connectedStreams.flatMap(new DataCleanFlatMapFunction())
其實可以看到,上面的代碼中做了四件事,首先在第1行定義了獲取主數據的流,在第4行定義了獲取規則數據的流,在AppKeySourceFunction中實現了讀取Restful的邏輯,
在第6行實現了將規則數據廣播到主數據中去,最后在第8行實現了從connectedStream中得到經過處理的數據。其中的關鍵即在於DataCleanFlatMapFunction。
1 public class DataCleanFlatMapFunction extends RichCoFlatMapFunction<String, Tuple2<String, String>, String>{ 2 3 public void flatMap1(String s, Collector<String> collector){...} 4 5 public void flatMap2(Tuple2<String, String> s, Collector<String> collector) {...} 6 7 8 }
這是一段縮減的代碼,關鍵在於第一行,首先這個函數需要實現RichCoFlatMapFunction這個抽象類,其次在類實現中,flatMap2會承接規則函數,flatMap1會承接主函數。
當然,參數可以從client發送到task,有時候也需要從task發回到client,一般這里就會使用accumulator。
這里先看一個簡單的例子,實現單詞的計數以及處理文本的記錄數:
1 DataSet<String> lines = ... 2 3 // Word count algorithm 4 lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 5 @Override 6 public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { 7 String[] words = line.split("\\W+"); 8 for (String word : words) { 9 out.collect(new Tuple2<>(word, 1)); 10 } 11 } 12 }) 13 .groupBy(0) 14 .sum(1) 15 .print(); 16 17 // Count a number of lines in the text to process 18 int linesCount = lines.count() 19 System.out.println(linesCount);
上面的代碼中,第14行實現了單詞的計算,第18行實現了處理記錄的行數,但很可惜,這里會產生兩個job,僅僅第18行一句代碼,就會產生一個job,無疑是不高效的。
flink提供了accumulator來實現數據的回傳,亦即從tm傳回到JM。
flink本身提供了一些內置的accumulator:
- IntCounter, LongCounter, DoubleCounter – allows summing together int, long, double values sent from task managers
- AverageAccumulator – calculates an average of double values
- LongMaximum, LongMinimum, IntMaximum, IntMinimum, DoubleMaximum, DoubleMinimum – accumulators to determine maximum and minimum values for different types
- Histogram – used to computed distribution of values from task managers
首先需要定義一個accumulator,然后在某個自定義函數中來注冊它,這樣在客戶端就可以獲取相應的的值。
1 lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() { 2 3 // Create an accumulator 4 private IntCounter linesNum = new IntCounter(); 5 6 @Override 7 public void open(Configuration parameters) throws Exception { 8 // Register accumulator 9 getRuntimeContext().addAccumulator("linesNum", linesNum); 10 } 11 12 @Override 13 public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { 14 String[] words = line.split("\\W+"); 15 for (String word : words) { 16 out.collect(new Tuple2<>(word, 1)); 17 } 18 19 // Increment after each line is processed 20 linesNum.add(1); 21 } 22 }) 23 .groupBy(0) 24 .sum(1) 25 .print(); 26 27 // Get accumulator result 28 int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum"); 29 System.out.println(linesNum);
當然,如果內置的accumulator不能滿足需求,可以自定義accumulator,只需要繼承兩個接口之一即可,Accumulator或者SimpleAccumulato。
上面介紹了幾種參數傳遞的方式,在日常的使用中,可能不僅僅是使用其中一種,或許是某些的組合,比如通過parametertool來傳遞hdfs的路徑,再通過filecache來讀取緩存。
