Flink初级编程实践报告


题目:Flink初级编程实践

 

日期:2021.12.24

实验环境:  

操作系统:Ubuntu 18.06

Hadoop版本:3.3.1

flink版本:1.9.1

JDK版本:1.8

 

本报告主要为http://dblab.xmu.edu.cn/blog/2507-2/厦门大学林子雨老师编著,本报告为自己安装过程中的一些解决方案,如有侵权请联系删除。

此版本教程利用Ubuntu18可视化版本,flink-1.9.1进行实验,其他版本可能有所不同。

 

 

Flink官网下载安装包,安装文件flink-1.9.1-bin-scala_2.11.tgz

将安装包放入桌面,利用命令cd /home/用户名/Desktop进入桌面

 

使用如下命令对安装文件进行解压缩:

  1. sudo tar -zxvf 文件名 -C /usr/local#(解压文件路径)

 

修改目录名称,并设置权限,命令如下:

  1. cd /usr/local #(解压文件路径)
  2. sudo mv ./flink-1.9.1 ./flink #(更改文件夹名称)
  3. sudo chown -R Ubuntu:Ubuntu ./flink #(Ubuntu:Ubuntu相应改为用户名字 如你的用户名为Hadoop则改为Hadoop:Hadoop)

更改完成后

可在目录/usr/local中看到flink文件夹


使用如下命令添加环境变量:

vim ~/.bashrc #vim可通过sudo apt install vim安装 其中i为插入insert

在.bashrc文件中添加如下内容:

 

export FLINK_HOME=/usr/local/flink

export PATH=$FLINK_HOME/bin:$PATH

保存并退出.bashrc文件,然后执行如下命令让配置文件生效:

  1. source ~/.bashrc

使用如下命令启动Flink:

  1. cd /usr/local/flink
  2. ./bin/start-cluster.sh

使用jps命令查看进程: 如果能够看到TaskManagerRunner和StandaloneSessionClusterEntrypoint这两个进程,就说明启动成功。


Flink的JobManager同时会在8081端口上启动一个Web前端,可以在浏览器中输入“http://localhost:8081”来访问。


Flink安装包中自带了测试样例,这里可以运行WordCount样例程序来测试Flink的运行效果,具体命令如下

  1. cd /usr/local/flink/bin
  2. ./flink run /usr/local/flink/examples/batch/WordCount.jar

执行上述命令以后,如果执行成功,应该可以看到类似如下的屏幕信息:

 

 

 

 

在Linux终端中执行如下命令,在用户主文件夹下创建一个文件夹flinkapp作为应用程序根目录:

  1. cd ~ #进入用户主文件夹
  2. mkdir -p ./flinkapp/src/main/java

然后,使用vim编辑器在“./flinkapp/src/main/java”目录下建立三个代码文件,即WordCountData.java、WordCountTokenizer.java和WordCount.java。

WordCountData.java用于提供原始数据,其内容如下:其中WORDS的内容可以随便更换。

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4.  
  5. public class WordCountData {
  6.     public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
  7.     public WordCountData() {
  8.     }
  9.     public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
  10. 10.         return env.fromElements(WORDS);
  11. 11.     }

12. }

Java

WordCountTokenizer.java用于切分句子,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.util.Collector;
  5.  
  6.  
  7. public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
  8.  
  9.     public WordCountTokenizer(){}
  10. 10.  
  11. 11.  
  12. 12.     public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  13. 13.         String[] tokens = value.toLowerCase().split("\\W+");
  14. 14.         int len = tokens.length;
  15. 15.  
  16. 16.         for(int i = 0; i<len;i++){
  17. 17.             String tmp = tokens[i];
  18. 18.             if(tmp.length()>0){
  19. 19.                 out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
  20. 20.             }
  21. 21.         }
  22. 22.     }

23. }

Java

WordCount.java提供主函数,其内容如下:

  1. package cn.edu.xmu;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6.  
  7.  
  8. public class WordCount {
  9.  
  10. 10.     public WordCount(){}
  11. 11.  
  12. 12.     public static void main(String[] args) throws Exception {
  13. 13.         ParameterTool params = ParameterTool.fromArgs(args);
  14. 14.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  15. 15.         env.getConfig().setGlobalJobParameters(params);
  16. 16.         Object text;
  17. 17.         //如果没有指定输入路径,则默认使用WordCountData中提供的数据
  18. 18.         if(params.has("input")){
  19. 19.             text = env.readTextFile(params.get("input"));
  20. 20.         }else{
  21. 21.             System.out.println("Executing WordCount example with default input data set.");
  22. 22.             System.out.println("Use -- input to specify file input.");
  23. 23.             text = WordCountData.getDefaultTextLineDataset(env);
  24. 24.         }
  25. 25.  
  26. 26.         AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
  27. 27.         //如果没有指定输出,则默认打印到控制台
  28. 28.         if(params.has("output")){
  29. 29.             counts.writeAsCsv(params.get("output"),"\n", " ");
  30. 30.             env.execute();
  31. 31.         }else{
  32. 32.             System.out.println("Printing result to stdout. Use --output to specify output path.");
  33. 33.             counts.print();
  34. 34.         }
  35. 35.  
  36. 36.     }

37. }

Java

该程序依赖Flink Java API,因此,我们需要通过Maven进行编译打包。需要新建文件pom.xml,然后,在pom.xml文件中添加如下内容,用来声明该独立应用程序的信息以及与Flink的依赖关系:

其中pom.xml文件位于cd ~/flinkapp

<project>

    <groupId>cn.edu.xmu</groupId>

    <artifactId>simple-project</artifactId>

    <modelVersion>4.0.0</modelVersion>

    <name>Simple Project</name>

    <packaging>jar</packaging>

    <version>1.0</version>

    <repositories>

        <repository>

            <id>jboss</id>

            <name>JBoss Repository</name>

            <url>http://repository.jboss.com/maven2/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.9.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

<version>1.9.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.11</artifactId>

            <version>1.9.1</version>

        </dependency>

    </dependencies>

</project>

为了保证Maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:

  1. cd ~/flinkapp
  2. find .

文件结构应该是类似如下的内容:

 

接下来,我们可以通过如下代码将整个应用程序打包成JAR包(注意:计算机需要保持连接网络的状态,而且首次运行打包命令时,Maven会自动下载依赖包,需要消耗几分钟的时间,其中如果打包的过程中显示connection refuse可能是下载镜像的问题需要进入maven文件目录的conf中修改settings.xml,本文中为cd /usr/local/maven/conf;利用命令行则为vim /usr/local/maven/conf/settings.xml;寻找到settings .xml的镜像下载位置将其改为阿里云的镜像网址(以下给出)):

  1. cd ~/flinkapp    #一定把这个目录设置为当前目录
  2. /usr/local/maven/bin/mvn package

 

 

1、阿里的镜像地址

<mirror>

    <id>alimaven</id>

    <name>aliyun maven</name>

    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>

    <mirrorOf>central</mirrorOf>

</mirror>

如果屏幕返回的信息中包含“BUILD SUCCESS”,则说明生成JAR包成功。

 

最后,可以将生成的JAR包通过flink run命令提交到Flink中运行(请确认已经启动Flink),命令如下:

  1. /usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar

执行成功后,可以在屏幕上看到词频统计结果。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM