題目:Flink初級編程實踐 |
|
日期:2021.12.24 |
實驗環境: 操作系統:Ubuntu 18.06 Hadoop版本:3.3.1 flink版本:1.9.1 JDK版本:1.8 |
本報告主要為http://dblab.xmu.edu.cn/blog/2507-2/廈門大學林子雨老師編著,本報告為自己安裝過程中的一些解決方案,如有侵權請聯系刪除。
此版本教程利用Ubuntu18可視化版本,flink-1.9.1進行實驗,其他版本可能有所不同。
到Flink官網下載安裝包,安裝文件flink-1.9.1-bin-scala_2.11.tgz
將安裝包放入桌面,利用命令cd /home/用戶名/Desktop進入桌面
使用如下命令對安裝文件進行解壓縮:
- sudo tar -zxvf 文件名 -C /usr/local#(解壓文件路徑)
修改目錄名稱,並設置權限,命令如下:
- cd /usr/local #(解壓文件路徑)
- sudo mv ./flink-1.9.1 ./flink #(更改文件夾名稱)
- sudo chown -R Ubuntu:Ubuntu ./flink #(Ubuntu:Ubuntu相應改為用戶名字 如你的用戶名為Hadoop則改為Hadoop:Hadoop)
更改完成后
可在目錄/usr/local中看到flink文件夾
使用如下命令添加環境變量:
vim ~/.bashrc #vim可通過sudo apt install vim安裝 其中i為插入insert
在.bashrc文件中添加如下內容:
export FLINK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
保存並退出.bashrc文件,然后執行如下命令讓配置文件生效:
- source ~/.bashrc
使用如下命令啟動Flink:
- cd /usr/local/flink
- ./bin/start-cluster.sh
使用jps命令查看進程: 如果能夠看到TaskManagerRunner和StandaloneSessionClusterEntrypoint這兩個進程,就說明啟動成功。
Flink的JobManager同時會在8081端口上啟動一個Web前端,可以在瀏覽器中輸入“http://localhost:8081”來訪問。
Flink安裝包中自帶了測試樣例,這里可以運行WordCount樣例程序來測試Flink的運行效果,具體命令如下
- cd /usr/local/flink/bin
- ./flink run /usr/local/flink/examples/batch/WordCount.jar
執行上述命令以后,如果執行成功,應該可以看到類似如下的屏幕信息:
在Linux終端中執行如下命令,在用戶主文件夾下創建一個文件夾flinkapp作為應用程序根目錄:
- cd ~ #進入用戶主文件夾
- mkdir -p ./flinkapp/src/main/java
然后,使用vim編輯器在“./flinkapp/src/main/java”目錄下建立三個代碼文件,即WordCountData.java、WordCountTokenizer.java和WordCount.java。
WordCountData.java用於提供原始數據,其內容如下:其中WORDS的內容可以隨便更換。
- package cn.edu.xmu;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- public class WordCountData {
- public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
- public WordCountData() {
- }
- public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
- 10. return env.fromElements(WORDS);
- 11. }
12. }
Java
WordCountTokenizer.java用於切分句子,其內容如下:
- package cn.edu.xmu;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
- public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
- public WordCountTokenizer(){}
- 10.
- 11.
- 12. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- 13. String[] tokens = value.toLowerCase().split("\\W+");
- 14. int len = tokens.length;
- 15.
- 16. for(int i = 0; i<len;i++){
- 17. String tmp = tokens[i];
- 18. if(tmp.length()>0){
- 19. out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
- 20. }
- 21. }
- 22. }
23. }
Java
WordCount.java提供主函數,其內容如下:
- package cn.edu.xmu;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.utils.ParameterTool;
- public class WordCount {
- 10. public WordCount(){}
- 11.
- 12. public static void main(String[] args) throws Exception {
- 13. ParameterTool params = ParameterTool.fromArgs(args);
- 14. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- 15. env.getConfig().setGlobalJobParameters(params);
- 16. Object text;
- 17. //如果沒有指定輸入路徑,則默認使用WordCountData中提供的數據
- 18. if(params.has("input")){
- 19. text = env.readTextFile(params.get("input"));
- 20. }else{
- 21. System.out.println("Executing WordCount example with default input data set.");
- 22. System.out.println("Use -- input to specify file input.");
- 23. text = WordCountData.getDefaultTextLineDataset(env);
- 24. }
- 25.
- 26. AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
- 27. //如果沒有指定輸出,則默認打印到控制台
- 28. if(params.has("output")){
- 29. counts.writeAsCsv(params.get("output"),"\n", " ");
- 30. env.execute();
- 31. }else{
- 32. System.out.println("Printing result to stdout. Use --output to specify output path.");
- 33. counts.print();
- 34. }
- 35.
- 36. }
37. }
Java
該程序依賴Flink Java API,因此,我們需要通過Maven進行編譯打包。需要新建文件pom.xml,然后,在pom.xml文件中添加如下內容,用來聲明該獨立應用程序的信息以及與Flink的依賴關系:
其中pom.xml文件位於cd ~/flinkapp
<project>
<groupId>cn.edu.xmu</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>jboss</id>
<name>JBoss Repository</name>
<url>http://repository.jboss.com/maven2/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>
</project>
為了保證Maven能夠正常運行,先執行如下命令檢查整個應用程序的文件結構:
- cd ~/flinkapp
- find .
文件結構應該是類似如下的內容:
接下來,我們可以通過如下代碼將整個應用程序打包成JAR包(注意:計算機需要保持連接網絡的狀態,而且首次運行打包命令時,Maven會自動下載依賴包,需要消耗幾分鍾的時間,其中如果打包的過程中顯示connection refuse可能是下載鏡像的問題需要進入maven文件目錄的conf中修改settings.xml,本文中為cd /usr/local/maven/conf;利用命令行則為vim /usr/local/maven/conf/settings.xml;尋找到settings .xml的鏡像下載位置將其改為阿里雲的鏡像網址(以下給出)):
- cd ~/flinkapp #一定把這個目錄設置為當前目錄
- /usr/local/maven/bin/mvn package
1、阿里的鏡像地址
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
如果屏幕返回的信息中包含“BUILD SUCCESS”,則說明生成JAR包成功。
最后,可以將生成的JAR包通過flink run命令提交到Flink中運行(請確認已經啟動Flink),命令如下:
- /usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar
執行成功后,可以在屏幕上看到詞頻統計結果。