一般情況下,開發大數據處理程序,我們希望能夠在本地編寫代碼並調試通過,能夠在本地進行數據測試,然后在生產環境去跑“大”數據。
一、nc工具
配置windows的nc端口,在網上下載nc.exe(https://eternallybored.org/misc/netcat/)
使用命令開始nc制定端口為9000(nc -L -p 9000 -v) 啟動插件
二、idea中配置,代碼以及設置參數
maven配置:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>lims</groupId> <artifactId>flink-project</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.7.2</flink.version> </properties> <dependencies> <!--log4j--> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
WordCount:
package flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * @Description: TODO * @Date: 2019/2/25 23:49 */ public class WordCount { public static void main(String[] args) throws Exception { //定義socket的端口號 int port; try{ ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("沒有指定port參數,使用默認值9000"); port = 9000; } //獲取運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n"); //計算數據 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word:splits) { out.collect(new WordWithCount(word,1L)); } } })//打平操作,把每行的單詞轉為<word,count>類型的數據 .keyBy("word")//針對相同的word數據進行分組 .timeWindow(Time.seconds(2),Time.seconds(1))//指定計算數據的窗口大小和滑動窗口大小 .sum("count"); //把數據打印到控制台 windowCount.print() .setParallelism(1);//使用一個並行度 //注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行 env.execute("streaming word count"); } /** * 主要為了存儲單詞以及單詞出現的次數 */ public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
三、運行結果
cmd中輸入單詞,空格分割,並換行,在idea的控制台中觀察輸出
本地開發調試實例完成