Flink 環境的搭建、獨立集群、Flink on Yarn、訪問Flink web界面、Flink提交任務的三種方式、Flink讀取HDFS上的數據


Flink 運行方式

三種運行方式(與spark相似):

1、local 本地測試

2、Standallone Cluster 獨立集群(做實時計算,不需要hadoop,該獨立集群可能用的上)

3、Flink on Yarn 推薦

Standallone Cluster 獨立集群

獨立集群是不依賴hadoop的,所以可以先停掉 Hadoop

注意:獨立集群的搭建需要配置 JAVA_HOME 和 免密登錄

1、上傳、解壓、配置環境變量

#進入壓縮包所在目錄
cd /usr/local/soft
#解壓
tar -zxvf /usr/local/soft/flink-1.11.2-bin-scala_2.11.tgz -C /usr/local/soft/

#配置環境變量
vim /etc/profile
#添加
export FLINK_HOME=/usr/local/soft/flink-1.11.2

export PATH=$PATH:$FLINK_HOME/bin

#刷新
source /etc/profile

2、修改配置文件

在此之前我們先來簡單了解一下Flink集群的架構

img

vim /usr/local/soft/flink-1.11.2/conf/flink-conf.yaml
# (修改)指定主節點ip地址
jobmanager.rpc.address: master

vim workers
# (修改)指定從節點
node1  
node2

vim masters 
# 改成主節點master
master:8081

3、同步到所有節點

scp -r /usr/local/soft/flink-1.11.2/ node1:`pwd`
scp -r /usr/local/soft/flink-1.11.2/ node2:`pwd`

4、啟動(停止)集群

Flink集群的命令都在 bin 目錄下,不記得可以去找

所有主從架構的集群都是在主節點操作命令

# 啟動
start-cluster.sh
# 停止
stop-cluster.sh
http://master:8081

img

img

img

img

Flink提交任務的三種方式

1、在web頁面中提交

以昨天WordCount代碼為例(代碼不需要改),將之打成jar包

img

img

img

img

img

img

2、同flink命令提交任務

將jar包上傳至集群,提交任務

flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 
com.shujia.flink.soure.Demo4ReadKafka -- 主類名
flink-1.0.jar -- jar包名

3、rpc方式提交任務 --- 遠程提交

用的較少

代碼寫完之后需要先打包,再運行

Flink框架報錯有一個特點:前幾條報錯原因都是廢話,要從后面看

package com.shujia.flink.core

import org.apache.flink.streaming.api.scala._

object Demo2RpcSubmit {
  def main(args: Array[String]): Unit = {

    /**
      * 創建遠程環境,遠程提交flink任務
      *
      */
      
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
      //主機名
      "master",
      //端口號
      8081,
      //指定jar包的路徑
      "C:\\Users\\qx\\IdeaProjects\\bigdata14\\flink\\target\\flink-1.0.jar" 
    )

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //1、將數據展開
    val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
    //2、轉換成kv格式
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    //3、按照單詞進行分組
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
    //4、統計數量,對value進行求和, 指定下標進行聚合
    val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
    //打印結果
    countDS.print()

    env.execute("rpc")

  }
}

只需要部署一個節點 -- 在master中部署即可

需要先將獨立集群停掉

JAVA_HOME、免密登錄、上傳、解壓、配置環境變量 -- 這些配置都要做

img

1、配置HADOOP_CONF_DIR

export -- 全局生效

vim /etc/profile
#添加
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/

source /etc/profile
cd /usr/local/soft/flink-1.11.2/lib

#jar包名
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar

flink和spark一樣都是粗粒度資源申請

啟動方式

1、yarn-session 在yarn里面啟動一個flink集群---- 所有任務共享同一個jobManager(ApplicationMaster)

需要先啟動Hadoop:start-all.sh

yarn-session是所有任務共享同一個jobmanager

# 在master中
# 開啟
yarn-session.sh -jm 1024m -tm 1096m
# -jm 1024m -- 指定JobManager的內存
# -tm 1096m -- 指定TaskManager的內存

# 啟動完畢之后會返回一個地址給我們--->Flink集群的web界面
# http://note01:46386

提交任務的三種方式

在master中輸入nc -lk 8888 的時候,可能會出現 端口占用

查看端口是否占用:ps -aux | grep 8888

如果占用,將其殺死:kill -9 編號

# 提交任務  
# 任務提交的時候是根據並行度動態申請taskmanager

1、在web頁面提交任務
# 將代碼代碼打包好的jar包添加到web頁面

2、同flink命令提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 

3、rpc方式提交任務(遠程提交)
# 遠程提交,需要結合啟動集群返回的地址---http://node1:46386
# 修改代碼,創建環境的時候,將master改成node1,將8888改成46386
# 然后將代碼打包,然后再運行代碼就行了(不需要我們手動向集群中添加jar包了)

# 任務結束之后,資源被動態回收

# 關閉/停止 yarn-session
yarn application -kill application_1647657435495_0001
# application_1647657435495_0001 -- yarn-session的yarn上進程號

stop 也能退出

2、直接提交任務到yarn----每一個任務單獨使用一個jobManager

直接提交任務就不需要啟動yarn-session

如果之前已啟動,需要將其殺死yarn application -kill

直接提交任務到yarn,一個任務需要執行一次命令,一個任務會有一個 單獨的web頁面

flink run -m yarn-cluster  -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar
# -m yarn-cluster -- 指定提交模式
# -yjm 1024m -ytm 1096m -- 指定JobManager和TaskManager的內存
# -c com.shujia.flink.core.Demo1WordCount -- 指定主類名
# flink-1.0.jar -- 指定jar包名

# 殺掉yarn上的任務
yarn application -kill application_1599820991153_0005

# 查看日志
yarn logs -applicationId application_1647657435495_0002

yarn-session先在yarn中啟動一個jobMansager ,所有的任務共享一個jobmanager (提交任務更快,任務之間共享jobmanager , 相互有影響)
直接提交任務模型,為每一個任務啟動一個joibmanager (每一個任務獨立jobmanager , 任務運行穩定)

Flink讀取HDFS上的數據

package com.shujia.flink.core

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._

object Demo3FlinkOnHdfs {
  def main(args: Array[String]): Unit = {
      //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     /**
      * 讀取hdfs中的數據  -- 有界流
      */
    val studentDS: DataStream[String] = env.readTextFile("hdfs://master:9000/data/student")

    val clazzNumDS: DataStream[(String, Int)] = studentDS
      .map(stu => (stu.split(",")(4), 1))
      .keyBy(_._1)
      .sum(1)

     /**
      * 將數據保存到hdfs(改代碼可以通過官網查看,不需要死記)
      */
    val sink: StreamingFileSink[(String, Int)] = StreamingFileSink
      //指定保存路徑和數據的編碼格式
      .forRowFormat(new Path("hdfs://master:9000/data/flink_clazz"), new SimpleStringEncoder[(String, Int)]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toSeconds(15))
          .withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
          .withMaxPartSize(1024)
          .build())
      .build()

    clazzNumDS.addSink(sink)

    env.execute()
  }
}

# 將代碼打包,然后提交到yarn上運行
# 執行結果可以通過日志查看,在web頁面只能查看無界流的任務運行結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM