4、Flink流處理案例實現-Java


在Flink項目里面創建一個包,同時新建一個wordcount類

 

 

package com.gong.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception{
        //解析命令行傳過來的參數args
        ParameterTool params=ParameterTool.fromArgs(args);

        //獲取一個flink的執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> dataStream = null;
        if(params.has("input")) {//判斷參數是否帶有input
             dataStream=env.readTextFile(params.get("input"));
        }else {
            System.out.println("數據不存在");
        }
        //數據統計單詞詞頻
        DataStream<Tuple2<String,Integer>> counts=  dataStream.flatMap(new Tokenizer())
                .keyBy(0)
                .sum(1);

        if(params.has("output")){
            counts.writeAsText(params.get("output"));
        }else {
            counts.print();
        }
        env.execute("Streaming wordcount ");

    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens =value.toLowerCase().split("\\W+");
            for (String token:tokens){
                out.collect(new Tuple2<>(token,1));
            }
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM