flink安裝及standalone模式啟動、idea中項目開發


安裝

環境

  • Ubuntu 18
  • jdk8
  • flink-1.8.1

安裝步驟

  1. 安裝jdk(略)

  2. 下載flink-1.8.1-bin-scala_2.12.tgz,解壓到指定目錄

    wget http://mirror.bit.edu.cn/apache/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz
    sudo mkdir /opt/flink
    sudo chown test flink
    sudo chgrp test flink
    tar -zxvf flink-1.8.1-bin-scala_2.12.tgz -C /opt/flink

  3. 單機資源有限,修改配置文件flink-conf.yaml

    The heap size for the JobManager JVM

    jobmanager.heap.size: 256m

    The heap size for the TaskManager JVM

    taskmanager.heap.size: 256m

standalone模式啟動

啟動

bin目錄下執行./start-cluster.sh

jps進程查看

3857 TaskManagerRunner
3411 StandaloneSessionClusterEntrypoint
3914 Jps

查看web頁面

web

運行example

example

查看結果文件

result

IDEA中編寫flink項目

在idea中會啟動一個本地的flink,適合作為開發環境

maven中添加依賴

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.8.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.8.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.8.1</version>
    </dependency>
</dependencies>

example代碼

package test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamingWindowWordCountJava {

public static void main(String[] args) throws Exception {

    // the port to connect to
    final int port = 9000;

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream("192.168.29.129", port, "\n");

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                //@Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                //@Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
}

// Data type for words with count
public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}
}

IDEA中運行結果

result

代碼打包運行

上述代碼,打包成simple-flink-code.jar
在flink的bin目錄下執行:
./flink run -c test.StreamingWindowWordCountJava /home/test/Desktop/simple-flink-code.jar(注意運行類前面寫上package名,-c參數順序在jar包前面,否則報錯)

參考

FLINK實例-WORDCOUNT詳細步驟


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM