Flink實例-Wordcount詳細步驟


link實例之Wordcount詳細步驟

1.我的IDE是IntelliJ IDEA.在官網上https://www.jetbrains.com/idea/下載最新版2018.2的IDEA,如下圖。破解可以再http://idea.lanyus.com/上獲取破解碼進行破解,如下圖。

 

2.當IDE准備就緒后,開始創建一個項目名為bbb的maven項目,如下圖。

 

3.在新窗口打開bbb項目時,IDEA會提示我們是否自動導包。選擇自動導包,如下圖。

 

 4.對pom.xml配置文件進行修改,如下代碼。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiao</groupId>
    <artifactId>bbb</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>

    </dependencies>

</project>

5.在src/main/java/目錄下新建一個類,我的類名為WordCount,如下代碼。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount {

    public static void main(String[] args) throws Exception {
        //定義socket的端口號
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("沒有指定port參數,使用默認值9000");
            port = 9000;
        }

        //獲取運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //連接socket獲取輸入的數據
        DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");

        //計算數據
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操作,把每行的單詞轉為<word,count>類型的數據
                .keyBy("word")//針對相同的word數據進行分組
                .timeWindow(Time.seconds(2),Time.seconds(1))//指定計算數據的窗口大小和滑動窗口大小
                .sum("count");
               
        //把數據打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一個並行度
        //注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行
        env.execute("streaming word count");

    }

    /**
     * 主要為了存儲單詞以及單詞出現的次數
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }


}

6.開啟IP為10.192.12.106的虛擬機,並開啟該虛擬機的終端,在終端輸入如下命令,該命令可以打開一個端口號為9000的監聽,輸入命令后光標會停留在如下圖的地方。

nc -l 9000

7.切換回IDEA,在菜單欄Build->Build Project,然后運行該類,當控制台console輸出如下圖所示的信息時表示Wordcount成功的與9000的監聽端口建立了連接。

 

8.在虛擬機終端開的光標停留出,輸入hello hello world world world world,然后 回車。在IDEA的控制台會顯示如下單詞和詞頻的信息,表示成功。

 9.接下來把項目bbb打jar包,上傳Flink后台運行,進行如下圖操作。

首先要保證Java Compiler版本為1.8。

 

然后選擇File->Project Structure,進行修改。

 

 

 

10.在配置好Flink的虛擬機下,進入目錄/opt/data/flink-1.3.2/bin中,輸入如下命令,開啟Flink的本地模式。(不會配置flink的小伙伴可以打開鏈接https://www.cnblogs.com/ALittleMoreLove/p/9396118.html

./start-local.sh

11.在瀏覽器里輸入開啟Flink守護進程的虛擬機的IP和8081端口,進入如下Flink前端頁面。

 12.上傳bbb.jar文件到Flink后端運行。

備注:在學習大數據的漫長道路上,我們會遇到各種各樣奇怪的問題,在嘗試了多種方法仍然無法解決后 如果再沒有高人指點,經常一個問題就卡好幾天。這種無奈與絕望的感覺我想各位自學大數據的小伙伴們應該深有體會。我個人解決問題通常有兩種方法:一種是直接找大牛幫忙,另外一種是在網上找各種相關的博客和帖子,再從中總結出一套可以解決自己問題的方法。自己探索新知識時,往往是很艱辛的,遇到好多天也解決不了的問題也是很正常的,但是千萬不要放棄,堅持下來就一定會有收獲的!Wordcount實例令我躺了兩天的坑,最后終於找到了解決的方法,希望這篇隨筆可以對自學大數據的小伙伴提供一定的幫助。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM