1 在IDEA中編寫Flink程序
Scala版Flink程序編寫
本項目使用的Flink版本為最新版本,也就是1.11.0。現在提供maven項目的配置文件。
- 使用Intellij IDEA創建一個Maven新項目
- 勾選
Create from archetype
,然后點擊Add Archetype
按鈕 GroupId
中輸入org.apache.flink
,ArtifactId
中輸入flink-quickstart-scala
,Version
中輸入1.11.0
,然后點擊OK
- 點擊向右箭頭,出現下拉列表,選中
flink-quickstart-scala:1.11.0
,點擊Next
Name
中輸入FlinkTutorial
,GroupId
中輸入com.atguigu
,ArtifactId
中輸入FlinkTutorial
,點擊Next
- 最好使用IDEA默認的Maven工具:Bundled(Maven 3),點擊
Finish
,等待一會兒,項目就創建好了
編寫WordCount.scala
程序
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamingJob { /** Main program method */ def main(args: Array[String]) : Unit = { // get the execution environment StreamExecutionEnvironment env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment // get input data by connecting to the socket val text: DataStream[String] = env .socketTextStream("localhost", 9999, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts .print() .setParallelism(1) env.execute("Socket Window WordCount") } /** Data type for words with count */ case class WordWithCount(word: String, count: Long) }
打開一個終端(Terminal),運行以下命令
$ nc -lk 9999
接下來使用IDEA
運行就可以了。
Java版Flink程序編寫
- 使用Intellij IDEA創建一個Maven新項目
- 勾選
Create from archetype
,然后點擊Add Archetype
按鈕 GroupId
中輸入org.apache.flink
,ArtifactId
中輸入flink-quickstart-java
,Version
中輸入1.11.0
,然后點擊OK
- 點擊向右箭頭,出現下拉列表,選中
flink-quickstart-java:1.11.0
,點擊Next
Name
中輸入FlinkTutorial
,GroupId
中輸入com.atguigu
,ArtifactId
中輸入FlinkTutorial
,點擊Next
- 最好使用IDEA默認的Maven工具:Bundled(Maven 3),點擊
Finish
,等待一會兒,項目就創建好了
編寫WordCount.java
程序
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountFromSocket { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> stream = env.socketTextStream("localhost", 9999); stream.flatMap(new Tokenizer()).keyBy(r -> r.f0).sum(1).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] stringList = value.split("\\s"); for (String s : stringList) { // 使用out.collect方法向下游發送數據 out.collect(new Tuple2(s, 1)); } } } }
2 下載Flink運行時環境,提交Jar包的運行方式
下載鏈接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
然后解壓
$ tar xvfz flink-1.11.1-bin-scala_2.11.tgz
啟動Flink集群
$ cd flink-1.11.1 $ ./bin/start-cluster.sh
可以打開Flink WebUI查看集群狀態:http://localhost:8081
在IDEA
中使用maven package
打包。
提交打包好的JAR
包
$ cd flink-1.11.1 $ ./bin/flink run 打包好的JAR包的絕對路徑
停止Flink集群
$ ./bin/stop-cluster.sh
查看標准輸出日志的位置,在log
文件夾中。
$ cd flink-1.11.1/log