之前看了視頻學習第一個flink word count使用,但是對於socket發送數據作為數據源我這里有點忘記了,加上最近有個項目要發布,一直在忙,所以遲遲無法完成;
1、首先我們要有數據源,因為不論是流計算處理還是批次處理,都需要數據源,然后經過transformation轉換成我們想要的數據輸出到某個地方,這里我們就輸出到控制台即可;
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class SocketTalkServer { public static void main(String[] args) { try { ServerSocket server = null; // 創建一個端口為9000監聽客戶端請求的serversocket try { server = new ServerSocket(9000); System.out.println("服務端啟動成功:服務端端口號為9000"); } catch (IOException e) { // 如果連接不上,打印出錯信息 System.out.println("can not listen to:"+e); } Socket serverSocket = null; try { // 使用accept()阻塞等待客戶請求,有客戶請求則產生一個Socket對象,並繼續執行 serverSocket = server.accept(); // 有客戶端連接 System.out.println("有個客戶端連接:"+serverSocket.getInetAddress()+":"+serverSocket.getPort()); } catch (IOException e) { // 客戶端請求異常 System.out.println(e); } String line; // 通過Socket對象得到輸出流,構造printwriter對象 PrintWriter serverPrintWriter = new PrintWriter(serverSocket.getOutputStream()); // 通過控制台構造bufferedreader對象 BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in)); // 服務端控制台上輸入的數據源字符串 String serverLine = serverInput.readLine(); // 如果輸入bye,停止循環 while (!serverLine.equals("bye")){ // 向客戶端輸出字符串 serverPrintWriter.println(serverLine); // 刷新輸出流 serverPrintWriter.flush(); // 在系統控制台上打印輸入的內容; System.out.println("Server:"+serverLine); // 繼續輸入然后重新讀取字符串 serverLine = serverInput.readLine(); } serverPrintWriter.close(); serverSocket.close(); server.close(); } catch (Exception e) { e.printStackTrace(); } } }
2、編寫flink計算程序,也是我的第一個程序,這里有幾個步驟,我覺着視頻中的老師寫的非常好,就抄過來了,十分易於理解:

package com.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { // 獲取所需要的端口號 int port = 9000; // try{ // ParameterTool parameterTool = ParameterTool.fromArgs(args); // port = parameterTool.getInt("port");} // catch (Exception e){ // System.err.println("no port specified. use default 9000"); // port = 9000; // } // 獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "127.0.0.1"; String delimiter = "\n"; // 鏈接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<WordIsCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordIsCount>() { @Override public void flatMap(String value, Collector<WordIsCount> out) throws Exception { String[] words = value.split("\\s"); for (String word : words) { out.collect(new WordIsCount(word, 1L)); } } }).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))// 指定時間窗口大小為2秒,指定時間間隔為1秒 .sum("count");// 在這里使用sum或者reduce都可以 // 將數據打印到控制台,並設置並行度 windowCounts.print().setParallelism(1); // 這一行代碼一定要實現,否則不執行 env.execute("socket window count"); } public static class WordIsCount{ public String word; public long count; public WordIsCount(String word, long count) { this.word = word; this.count = count; } public WordIsCount() { } @Override public String toString() { return "WordIsCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
