現在大數據相關服務,越來越傾向於使用scala語言,scala函數式編程的優勢我不多贅述。最明顯的一個優點,代碼簡潔。看個WordCount實現對比:
Java版WordCount
1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2 3 DataSet<String> text = env.readTextFile("/path/to/file"); 4 5 DataSet<Tuple2<String, Integer>> counts = 6 // split up the lines in pairs (2-tuples) containing: (word,1) 7 text.flatMap(new Tokenizer()) 8 // group by the tuple field "0" and sum up tuple field "1" 9 .groupBy(0) 10 .sum(1); 11 12 counts.writeAsCsv(outputPath, "\n", " "); 13 14 // User-defined functions 15 public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { 16 17 @Override 18 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 19 // normalize and split the line 20 String[] tokens = value.toLowerCase().split("\\W+"); 21 22 // emit the pairs 23 for (String token : tokens) { 24 if (token.length() > 0) { 25 out.collect(new Tuple2<String, Integer>(token, 1)); 26 } 27 } 28 } 29 }
scala版WordCount
1 val env = ExecutionEnvironment.getExecutionEnvironment 2 3 // get input data 4 val text = env.readTextFile("/path/to/file") 5 6 val counts = text.flatMap { _.toLowerCase.split("\\s+") filter { _.nonEmpty } } 7 .map ( (_, 1) ) 8 .groupBy(0) 9 .sum(1) 10 11 counts.writeAsCsv(outputPath, "\n", " ")
如何搭建開發環境呢?
這里介紹一下主要關注的點。
- 版本匹配
idea和scala的版本匹配有比較大的耦合,具體如何對應需要確認一下。根據flink的scala版本選定要安裝的scala sdk,flink使用的scala sdk是2.12,所以我這里選擇的sdk版本是2.12.8。
- 安裝scala plugin


- 設置Library
創建flink maven工程后,右鍵工程打開module setting,在Libraries中下載所需版本的scala sdk,選擇下載並等待下載成功。

- 避免多版本sdk沖突
確認安裝成功,如果存在其他版本的scala sdk,刪除掉
Global Libraries
Global Libraries中,同樣選中所需的scala sdk,否則compile可能出錯,類似這樣的錯誤
compiler error

- maven依賴
使用flink scala版本的包依賴,官方的例子用的java版本依賴,會導致寫scala調用一些flink api出現語法錯誤。
1 <version>1.0-SNAPSHOT</version> 2 <dependencies> 3 <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> 4 <dependency> 5 <groupId>org.apache.flink</groupId> 6 <artifactId>flink-streaming-scala_2.12</artifactId> 7 <version>1.9.0</version> 8 </dependency> 9 <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> 10 <dependency> 11 <groupId>org.apache.flink</groupId> 12 <artifactId>flink-scala_2.12</artifactId> 13 <version>1.9.0</version> 14 </dependency> 15 </dependencies>
這樣代碼可以正常在IDEA里面跑了。
如果創建時找不到scala的文件
需要導入scala的sdk,忘了說了,別忘了裝好
1.8的java的sdk

1 import org.apache.flink.api.scala.ExecutionEnvironment 2 object WordCountBatch { 3 def main(args: Array[String]): Unit = { 4 5 val inputPath = "D:\\data\\11.txt" 6 // val env = StreamExecutionEnvironment.getExecutionEnvironment 7 val env = ExecutionEnvironment.getExecutionEnvironment 8 9 import org.apache.flink.api.scala._ 10 val text = env.readTextFile(inputPath) 11 val counts = text.flatMap(_.split("\\W+")) 12 .filter(_.nonEmpty) 13 .map((_,1)) 14 .groupBy(0) 15 .sum(1) 16 17 counts.writeAsCsv("D:\\data\\output6").setParallelism(1) 18 env.execute("batch wordCount") 19 } 20 21 } 22