Flink基礎(五):DS簡介(5) 編寫第一個Flink程序


1 在IDEA中編寫Flink程序

Scala版Flink程序編寫

本項目使用的Flink版本為最新版本,也就是1.11.0。現在提供maven項目的配置文件。

  1. 使用Intellij IDEA創建一個Maven新項目
  2. 勾選Create from archetype,然后點擊Add Archetype按鈕
  3. GroupId中輸入org.apache.flinkArtifactId中輸入flink-quickstart-scalaVersion中輸入1.11.0,然后點擊OK
  4. 點擊向右箭頭,出現下拉列表,選中flink-quickstart-scala:1.11.0,點擊Next
  5. Name中輸入FlinkTutorialGroupId中輸入com.atguiguArtifactId中輸入FlinkTutorial,點擊Next
  6. 最好使用IDEA默認的Maven工具:Bundled(Maven 3),點擊Finish,等待一會兒,項目就創建好了

編寫WordCount.scala程序

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingJob {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // get the execution environment
    StreamExecutionEnvironment env: StreamExecutionEnvironment = StreamExecutionEnvironment
      .getExecutionEnvironment

    // get input data by connecting to the socket
    val text: DataStream[String] = env
      .socketTextStream("localhost", 9999, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5))
      .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts
      .print()
      .setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

打開一個終端(Terminal),運行以下命令

$ nc -lk 9999

接下來使用IDEA運行就可以了。

Java版Flink程序編寫

  1. 使用Intellij IDEA創建一個Maven新項目
  2. 勾選Create from archetype,然后點擊Add Archetype按鈕
  3. GroupId中輸入org.apache.flinkArtifactId中輸入flink-quickstart-javaVersion中輸入1.11.0,然后點擊OK
  4. 點擊向右箭頭,出現下拉列表,選中flink-quickstart-java:1.11.0,點擊Next
  5. Name中輸入FlinkTutorialGroupId中輸入com.atguiguArtifactId中輸入FlinkTutorial,點擊Next
  6. 最好使用IDEA默認的Maven工具:Bundled(Maven 3),點擊Finish,等待一會兒,項目就創建好了

編寫WordCount.java程序

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountFromSocket {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> stream = env.socketTextStream("localhost", 9999);

        stream.flatMap(new Tokenizer()).keyBy(r -> r.f0).sum(1).print();

        env.execute("Flink Streaming Java API Skeleton");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] stringList = value.split("\\s");
            for (String s : stringList) {
                // 使用out.collect方法向下游發送數據
                out.collect(new Tuple2(s, 1));
            }
        }
    }
}

2 下載Flink運行時環境,提交Jar包的運行方式

下載鏈接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz

然后解壓

$ tar xvfz flink-1.11.1-bin-scala_2.11.tgz

啟動Flink集群

$ cd flink-1.11.1 $ ./bin/start-cluster.sh 

可以打開Flink WebUI查看集群狀態:http://localhost:8081

IDEA中使用maven package打包。

提交打包好的JAR

$ cd flink-1.11.1 $ ./bin/flink run 打包好的JAR包的絕對路徑 

停止Flink集群

$ ./bin/stop-cluster.sh

查看標准輸出日志的位置,在log文件夾中。

$ cd flink-1.11.1/log


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM