Flink起步安裝和使用


下載安裝

下載地址

下載對應操作系統和版本的flink

 # 首先確認下Java環境
 $ java -version
 java version "1.8.0_111"
 Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
 Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
 
 # Linux安裝
 $ wget https://www.apache.org/dyn/closer.lua/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz # 下載二進制安裝包
 $ tar xzf flink-*.tgz   # 解壓安裝包
 $ cd flink-1.7.1 # 切到安裝包目錄
 
 # Mac安裝
 $ brew install apache-flink
 ...
 # 查看版本
 $ flink --version
 Version: 1.2.0, Commit ID: 1c659cf
 
 # 查看安裝位置
 $ brew info apache-flink
 https://flink.apache.org/
 /usr/local/Cellar/apache-flink/1.7.1 (170 files, 321.1MB) *
  Built from source on 2019-02-14 at 09:32:35
 From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb
 ==> Requirements
 Required: java = 1.8 ✔
 ==> Options
 --HEAD
  Install HEAD version
 ==> Analytics
 install: 915 (30 days), 3,279 (90 days), 9,094 (365 days)
 install_on_request: 899 (30 days), 3,226 (90 days), 8,878 (365 days)
 build_error: 0 (30 days)
 
 # Mac上注意事項
 # Mac上對應Linux的bin目錄在/usr/local/Cellar/apache-flink/1.7.1/libexec
 

 

配置運行

 $ cd flink-1.7.1
 $ ./bin/start-cluster.sh
 # 端口運行在localhost:8081

 

創建IDEA Maven工程 -> Add Archetype

GroupId: org.apache.flink

ArtifactId: flink-quickstart-java

Version: 1.7.1

 

添加代碼

 public class SocketWindowWordCount {
 
     public static void main(String[] args) throws Exception {
 
         // the port to connect to
         final int port;
         try {
             final ParameterTool params = ParameterTool.fromArgs(args);
             port = params.getInt("port");
        } catch (Exception e) {
             System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
             return;
        }
 
         // get the execution environment
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // get input data by connecting to the socket
         DataStream<String> text = env.socketTextStream("localhost", port, "\n");
 
         // parse the data, group it, window it, and aggregate the counts
         DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                 @Override
                 public void flatMap(String value, Collector<WordWithCount> out) {
                     for (String word : value.split("\\s")) {
                         out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                 @Override
                 public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                     return new WordWithCount(a.word, a.count + b.count);
                }
            });
 
         // print the results with a single thread, rather than in parallel
         windowCounts.print().setParallelism(1);
 
         env.execute("Socket Window WordCount");
    }
 
     // Data type for words with count
     public static class WordWithCount {
 
         public String word;
         public long count;
 
         public WordWithCount() {}
 
         public WordWithCount(String word, long count) {
             this.word = word;
             this.count = count;
        }
 
         @Override
         public String toString() {
             return word + " : " + count;
        }
    }
 }

 

打包運行

 $ maven clean package -Dmaven.test.skip=true

 

開啟監聽端口

 $ nc -l 9000

 

運行上方代碼

 # 連接到上方端口, 切到上方打好的jar包路徑
 $ flink run -c 包路徑.SocketWindowWordCount jar包路徑 --port 9000 # 包路徑指的是當前的java類的package

 

我們可以在nc中輸入數據

 $ nc -l 9000
 hello hello hello
 hehe
 your world

 

查看結果

 $ tail -f log/flink-*-taskexecutor-*.out

 

停止flink

 $ ./bin/stop-cluster.sh

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM