關於Flink相關的概念性東西就不說了,網上都有,官網也很詳盡。本文主要記錄一下Java使用Flink的簡單例子。
首先,去官網下載Flink的zip包(鏈接就不提供了,你已經是個成熟的程序員了,該有一定的搜索能力了),解壓后放到你想放的地方。
進入主目錄后,是這樣子的

image.png
你可以簡單的看下其目錄結構,然后就回到你喜歡的IDE創建一個工程吧。
使用IDEA創建一個maven項目,然后加入相應的依賴即可。也可以按照Flink官網的方式去創建一個maven工程,然后導入你喜歡的IDE。下面是官網的quickstart里的maven依賴。
<dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> --> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
創建工程后我們就可以寫代碼了,以下的例子和官網上的差不多,直接上代碼
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * Skeleton for a Flink Streaming Job. * * <p>For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your appliation into a JAR file for execution, run * 'mvn clean package' on the command line. * * <p>If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like * env.readTextFile(textPath); * * then, transform the resulting DataStream<String> using operations * like * .filter() * .flatMap() * .join() * .coGroup() * * and many more. * Have a look at the programming guide for the Java API: * * http://flink.apache.org/docs/latest/apis/streaming/index.html * */ DataStream<String> text = env.socketTextStream("127.0.0.1", 9000); DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1); dataStream.print(); // execute program env.execute("Java WordCount from SocketTextStream Example"); } }
大家都是文化人,注釋已經很詳盡了,就不翻譯了,唯一需要注意的是,IDEA好像不支持它的lambda表達式,所以我這里沒有直接變lambda。
接下來就是激動人心的運行環節,Windows需要安裝個瑞士軍刀來支持nc命令(直接在官網下個zip包,解壓,配置到環境變量即可)。在命令行中執行 nc -l -p 9000,然后運行上邊那個程序(如果先運行程序會因為連接不到socket報錯)

image.png
隨便輸入,然后在IDEA的console中可以看到如下的結果。

image.png
以上因為沒啟動Flink服務,所以不需要像其他博主那樣,去localhost:8081的webUI中進行監控
,StreamExecutionEnvironment.getExecutionEnvironment()會創建一個LocalEnvironment然后在Java虛擬機上執行。
Windows單機模式下啟動Flink相當簡單,進入到bin目錄,直接雙擊start-cluster.bat,會啟動Flink的JobManager和TaskManager兩個服務。如果想將上述程序提交到Flink,需要執行maven命令打成jar包,然后在命令行中,進入到bin目錄下執行 flink.bat run xxxx/xxx/xxx.jar 即可,輸出結果會在TaskManager的服務窗口中輸出。
作者:瓜爾佳_半闕
鏈接:https://www.jianshu.com/p/f8b66afd32bf
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。