我的第一個flink_java程序


之前看了視頻學習第一個flink  word count使用,但是對於socket發送數據作為數據源我這里有點忘記了,加上最近有個項目要發布,一直在忙,所以遲遲無法完成;

1、首先我們要有數據源,因為不論是流計算處理還是批次處理,都需要數據源,然后經過transformation轉換成我們想要的數據輸出到某個地方,這里我們就輸出到控制台即可;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTalkServer {

    public static void main(String[] args) {
        try {
            ServerSocket server = null;
            // 創建一個端口為9000監聽客戶端請求的serversocket
            try {
                server = new ServerSocket(9000);
                System.out.println("服務端啟動成功:服務端端口號為9000");
            } catch (IOException e) {
                // 如果連接不上,打印出錯信息
                System.out.println("can not listen to:"+e);
            }
            Socket serverSocket = null;
            try {
                // 使用accept()阻塞等待客戶請求,有客戶請求則產生一個Socket對象,並繼續執行
                serverSocket = server.accept();
                // 有客戶端連接
                System.out.println("有個客戶端連接:"+serverSocket.getInetAddress()+":"+serverSocket.getPort());
            } catch (IOException e) {
                // 客戶端請求異常
                System.out.println(e);
            }
            String line;
            // 通過Socket對象得到輸出流,構造printwriter對象
            PrintWriter serverPrintWriter = new PrintWriter(serverSocket.getOutputStream());
            // 通過控制台構造bufferedreader對象
            BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in));
            // 服務端控制台上輸入的數據源字符串
            String serverLine = serverInput.readLine();
            // 如果輸入bye,停止循環
            while (!serverLine.equals("bye")){
                // 向客戶端輸出字符串
                serverPrintWriter.println(serverLine);
                // 刷新輸出流
                serverPrintWriter.flush();
                // 在系統控制台上打印輸入的內容;
                System.out.println("Server:"+serverLine);
                // 繼續輸入然后重新讀取字符串
                serverLine = serverInput.readLine();
            }
            serverPrintWriter.close();
            serverSocket.close();
            server.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、編寫flink計算程序,也是我的第一個程序,這里有幾個步驟,我覺着視頻中的老師寫的非常好,就抄過來了,十分易於理解:

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        // 獲取所需要的端口號
        int port = 9000;
//        try{
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        port = parameterTool.getInt("port");}
//        catch (Exception e){
//            System.err.println("no port specified. use default 9000");
//            port = 9000;
//        }
        // 獲取flink的運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String hostname = "127.0.0.1";
        String delimiter = "\n";
        // 鏈接socket獲取輸入的數據
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
        DataStream<WordIsCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordIsCount>() {
            @Override
            public void flatMap(String value, Collector<WordIsCount> out) throws Exception {
                String[] words = value.split("\\s");
                for (String word : words) {
                    out.collect(new WordIsCount(word, 1L));
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))// 指定時間窗口大小為2秒,指定時間間隔為1秒
                .sum("count");// 在這里使用sum或者reduce都可以
        // 將數據打印到控制台,並設置並行度
        windowCounts.print().setParallelism(1);

        // 這一行代碼一定要實現,否則不執行
        env.execute("socket window count");

    }

    public static class WordIsCount{
        public String word;
        public long count;

        public WordIsCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        public WordIsCount() {
        }

        @Override
        public String toString() {
            return "WordIsCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM