Flink集群模式部署及案例執行


一.軟件要求

  Flink在所有類UNIX的環境【例如linux,mac os x和cygwin】上運行,並期望集群由一個 主節點和一個或多個工作節點組成。在開始設置系統之前,確保在每個節點上都安裝了一下軟件:

  1.Java1.8.x或更高版本

  2.ssh,必須運行sshd才能使用管理遠程組件的Flink腳本

  在所有集群節點上都具有免密碼的ssh和相同的目錄結構,將使你可以使用flink腳本來控制所有內容。

二.Flink Standalone模式設置

  1.下載

  前往Flink官網下載最新版Flink【我下載的是flink-1.8.2】。若要在Hadoop上使用Flink,則需要下載與Hadoop匹配的版本。下載完成后,上傳到幾個各個節點並解壓

  

  2.配置Flink

  通過編輯conf/flink-conf.yaml來為集群配置flink。設置jobmanager.rpc.address以指定flink主節點。還可以通過設置jobmanager.heap.size和taskmanager.heap.size來指定允許JVM在每個節點上分配的最大內存。這些值都是以MB為單位,如果某些工作程序節點有更多的內存分配給Flink集群,則可以通過FLINK_TM_HEAP在那些特定節點上設置環境變量來覆蓋默認值。最后,必須提供集群中所有節點的列表,這些列表將用作工作節點。因此,類似於HDFS配置,編輯文件conf/slaves並輸入每個子節點的IP/主機名。每個子節點都將運行TaskManager。

  以下示例說明了具有三個節點(IP地址從10.0.0.1到10.0.0.3且主機名分別為master,worker1,worker2)的設置,並顯示了配置文件的內容:

  

  具體配置如下:

jobmanager.rpc.address:192.168.136.7 # 在每個節點上分別指定各自節點的IP/主機名
taskmanager.tmp.dirs: /usr/local/soft/flink-1.8.2/tmp # 指定每個taskmanager的臨時目錄 jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1

  解釋如下:

    1.jobmanager.heap.size:每個JobManager的可用內存大小,默認為1024M

    2.taskmanager.heap.size:每個TaskManager的可用內存大小,默認為1024M

    3.taskmanager.numberOfTaskSlots:每台計算機可用的CPU數,默認為1

    4.parallelism.default:集群中的CPU總數之和

    5.io.tmp.dirs:臨時目錄

  3.配置slaves

    

  4.配置環境變量

    

  5.啟動flink

    執行bin/start-cluster.sh啟動JobManager,並通過SSH連接到slaves文件中列出的所有工作節點,以在每個節點上啟動TaskManager。

    

  6.Web UI

    打開瀏覽器,輸入:http://master:8081 

  配置成功!

三.本地執行WordCount

  1.代碼

package cn.demo

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._ //必須導入

/**
  * Created by Administrator on 2020/1/22.
  */
object WordCount {
  def main(args: Array[String]) {
    val params : ParameterTool = ParameterTool.fromArgs(args)

    // 設置execution執行環境
    val execution = ExecutionEnvironment.getExecutionEnvironment

    // 設置web界面有效參數
    execution.getConfig.setGlobalJobParameters(params)

    val text = execution.fromElements("Apache Flink is an open source platform for distributed stream and batch data processing.",
      "Flink core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. ",
      "Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.")

    val counts = text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty))
      .map((_, 1))
      .groupBy(0)//根據第一個元素分組
      .sum(1)
      .sortPartition(0, Order.ASCENDING) //按照分區進行排序
      .first(6)
    
    counts.print()
  }
}

  2.本地執行結果

    

四.案例執行

  要運行Flink案例,必須有一個正在運行的Flink實例。最簡單的方法是運行./bin/start-cluster.sh,默認情況下會啟動一個帶有JobManager和一個TaskManager的本地集群。每個Flink二進制發行版都包含一個examples目錄,其中包含WordCount這個最常用案例。

  要運行WordCount案例,執行以下命令:

  ./bin/flink run ./examples/batch/WordCount.jar --input /data/flink/wordcount --output /data/flink/wcresult

  備注:input路徑要提前創建好,其中保存要計算的數據!

  

  執行結果:

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM