輕裝上陣Flink--在IDEA上開發基於Flink的實時數據流程序


前言

      本文介紹如何在IDEA上快速開發基於Flink框架的DataStream程序。先直接上手!

環境清單

      案例是在win7運行。安裝VirtualBox,在VirtualBox上安裝Centos操作系統。所有資源都在百度雲上,有需要請直接下載。安裝教程基本都是傻瓜式,文章不做講述,有需要直接網上搜索。

資源 版本
VirtualBox 5.2.16
Centos 6.5
Maven 3.6.3
JDK 8u241
IDEA 2019.3.2
Flink 1.10.0

鏈接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row

提取碼:qt2p

輕裝上陣

1、IP設置

      Centos的設置靜態IP為192.168.2.20,關閉防火牆

1 vi /etc/sysconfig/network-scripts/ifcfg-eth0
2 DEVICE=eth0
3 TYPE=Ethernet
4 ONBOOT=yes #開機啟動eth0網卡
5 NM_CONTROLLED=yes
6 BOOTPROTO=static
7 IPADDR=192.168.2.20
8 GATEWAY=192.168.2.1
9 NETMASK=255.255.255.0
     如果此時ping www.baidu.com等不通,需要我們添加dns服務器。
1 [root@localhost network-scripts]# vi /etc/resolv.conf
2 nameserver 192.168.2.1
  重新啟動網絡服務
1 [root@localhost network-scripts]# service network restart                   
2 正在關閉接口 eth0:[確定]
3 關閉環回接口:[確定]
4 彈出環回接口:[確定]
5 彈出界面 eth0:Determining if ip address 192.168.2.20 is already in use for device eth0...
6                                                            [確定]
      關閉防火牆
1 [root@localhost network-scripts]# service iptables stop

2、創建項目

   在win7的命令行下,用mvn命令創建開發模板

1 mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
這種方式允許你為新項目命名。它將以交互式的方式詢問你項目的 groupId、artifactId 和 package 名稱。
用tree命令看下,如下結構。項目是一個 Maven project,它包含了兩個類:StreamingJob 和 BatchJob
分別是 DataStream and DataSet 程序的基礎骨架程序。main 方法是程序的入口,既可用於IDE測試/執行,也可用於部署。
 1 │  pom.xml
 2 └─src
 3     └─main
 4         ├─java
 5         │  └─com
 6         │      └─ryan
 7         │              BatchJob.java
 8         │              StreamingJob.java
 9         └─resources
10                 log4j.properties
3、寫一個自己的DataStream的程序

 功能介紹:WindowWordCount.java,5s為一個時間窗口,攝取數據源的數據,計算單詞出現的次數。

 實時數據流計算簡易架構圖:

為了演示方便,這里我們只演示消息隊列和Flink Job兩個模塊,利用nc工具來替代消息隊列作為Flink Job攝取的數據源。

代碼:

 1 package com.ryan;
 2 import org.apache.flink.api.common.functions.FlatMapFunction;
 3 import org.apache.flink.api.java.tuple.Tuple2;
 4 import org.apache.flink.streaming.api.datastream.DataStream;
 5 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 6 import org.apache.flink.streaming.api.windowing.time.Time;
 7 import org.apache.flink.util.Collector;
 8 public class WindowWordCount {
 9     public static void main(String[] args) throws Exception {
10         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
11         DataStream<Tuple2<String, Integer>> dataStream = env
12                 .socketTextStream("192.168.2.20", 9999)
13                 .flatMap(new Splitter())
14                 .keyBy(0)
15                 .timeWindow(Time.seconds(5))
16                 .sum(1);
17         dataStream.print();
18         env.execute("Window WordCount");
19     }
20     public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
21         @Override
22         public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
23             for (String word: sentence.split(" ")) {
24                 out.collect(new Tuple2<String, Integer>(word, 1));
25             }
26         }
27     }
28 }

在centos機器上,命令行啟動nc

1 nc -lk 9999

IDEA上直接run main方法,然后在centos機器上,不斷輸入單詞。

1 [ryan@localhost ~]$ nc -lk 9999
2 java
3 java
4 shen
5 深圳 深圳
IDEA控制台上輸出如下:

注意:第一次在IDEA上運行這個程序,可能會報如下異常

1 java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream

原因是IDEA沒有導入flink 的lib下的jar包。導入即可。

4、打包發布到centos平台上的Flink集群

      修改pom.xml文件的mainclass的值為com.ryan.WindowWordCount

1 <mainClass>com.ryan.WindowWordCount</mainClass>

      執行mvn clean install,得到flink-demo-1.0-SNAPSHOT.jar,並上傳到centos機器上。

1 mvn clean install

      打開兩個centos的控制台,一個用於打開nc,一個用於運行我們打包好的Flink jar包。

1 [ryan@localhost ~]$ nc -lk 9999
2 java
3 shen
4 深圳 深圳 深圳
1 [root@localhost flink-1.10.0]# bin/flink run flink-demo/flink-demo-1.0-SNAPSHOT.jar 
2 Job has been submitted with JobID 9931a9dfc2eddeb2d0b5ed15578bd488
  回到win7上,用瀏覽器打開http://192.168.2.20:8081/,在Running Jobs上,可以看到一條記錄。

 

       在Task Managers上,Stdout模塊看到程序輸出的結果。

       所有代碼都上傳到github上,有需要的朋友可以下載

1 https://github.com/qinxiongzhou/flink-demo

       至此,我們完成了開發編譯調試到最終上線生產運行。喜歡請關注公眾號--程序猿牧場,謝謝!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM