Flink開發-IDEA scala開發環境搭建


現在大數據相關服務,越來越傾向於使用scala語言,scala函數式編程的優勢我不多贅述。最明顯的一個優點,代碼簡潔。看個WordCount實現對比:

Java版WordCount
 1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 2 
 3 DataSet<String> text = env.readTextFile("/path/to/file");
 4 
 5 DataSet<Tuple2<String, Integer>> counts =
 6         // split up the lines in pairs (2-tuples) containing: (word,1)
 7         text.flatMap(new Tokenizer())
 8         // group by the tuple field "0" and sum up tuple field "1"
 9         .groupBy(0)
10         .sum(1);
11 
12 counts.writeAsCsv(outputPath, "\n", " ");
13 
14 // User-defined functions
15 public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
16 
17     @Override
18     public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
19         // normalize and split the line
20         String[] tokens = value.toLowerCase().split("\\W+");
21 
22         // emit the pairs
23         for (String token : tokens) {
24             if (token.length() > 0) {
25                 out.collect(new Tuple2<String, Integer>(token, 1));
26             }   
27         }
28     }
29 }

 

scala版WordCount
 1 val env = ExecutionEnvironment.getExecutionEnvironment
 2 
 3 // get input data
 4 val text = env.readTextFile("/path/to/file")
 5 
 6 val counts = text.flatMap { _.toLowerCase.split("\\s+") filter { _.nonEmpty } }
 7   .map ( (_, 1) )
 8   .groupBy(0)
 9   .sum(1)
10 
11 counts.writeAsCsv(outputPath, "\n", " ")

 

如何搭建開發環境呢?
這里介紹一下主要關注的點。
  • 版本匹配
idea和scala的版本匹配有比較大的耦合,具體如何對應需要確認一下。根據flink的scala版本選定要安裝的scala sdk,flink使用的scala sdk是2.12,所以我這里選擇的sdk版本是2.12.8。
  • 安裝scala plugin
 
  • 設置Library
創建flink maven工程后,右鍵工程打開module setting,在Libraries中下載所需版本的scala sdk,選擇下載並等待下載成功。
  • 避免多版本sdk沖突
確認安裝成功,如果存在其他版本的scala sdk,刪除掉
 
Global Libraries
Global Libraries中,同樣選中所需的scala sdk,否則compile可能出錯,類似這樣的錯誤 compiler error
  • maven依賴
使用flink scala版本的包依賴,官方的例子用的java版本依賴,會導致寫scala調用一些flink api出現語法錯誤。
 
 1 <version>1.0-SNAPSHOT</version>
 2 <dependencies>
 3     <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
 4     <dependency>
 5         <groupId>org.apache.flink</groupId>
 6         <artifactId>flink-streaming-scala_2.12</artifactId>
 7         <version>1.9.0</version>
 8     </dependency>
 9     <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
10     <dependency>
11         <groupId>org.apache.flink</groupId>
12         <artifactId>flink-scala_2.12</artifactId>
13         <version>1.9.0</version>
14     </dependency>
15 </dependencies>

 

這樣代碼可以正常在IDEA里面跑了。
 
 
如果創建時找不到scala的文件
 
需要導入scala的sdk,忘了說了,別忘了裝好 1.8的java的sdk
 
 
 
 1 import org.apache.flink.api.scala.ExecutionEnvironment
 2 object WordCountBatch {
 3   def main(args: Array[String]): Unit = {
 4  
 5     val inputPath = "D:\\data\\11.txt"
 6 //    val env = StreamExecutionEnvironment.getExecutionEnvironment
 7     val env = ExecutionEnvironment.getExecutionEnvironment
 8  
 9     import  org.apache.flink.api.scala._
10     val text = env.readTextFile(inputPath)
11     val counts = text.flatMap(_.split("\\W+"))
12       .filter(_.nonEmpty)
13       .map((_,1))
14       .groupBy(0)
15       .sum(1)
16  
17     counts.writeAsCsv("D:\\data\\output6").setParallelism(1)
18     env.execute("batch wordCount")
19   }
20  
21 }
22  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM