初探Apache Beam


 

   文章作者:luxianghao

   文章來源:http://www.cnblogs.com/luxianghao/p/9010748.html  轉載請注明,謝謝合作。

   免責聲明:文章內容僅代表個人觀點,如有不當,歡迎指正。

   --- 

 

一 引言

2016年2月Google宣布將Beam(原名Google DataFlow)貢獻給Apache基金會孵化,成為Apache的一個頂級開源項目。

Beam是一個統一的編程框架,支持批處理和流處理,並可以將用Beam編程模型構造出來的程序,在多個計算引擎(Apache ApexApache FlinkApache SparkGoogle Cloud Dataflow等)上運行。

大數據起源於Google 2003年發布的三篇論文GoogleFS、MapReduce、BigTable,史稱三駕馬車,可惜Google在發布論文后並沒有公布其源碼,但是Apache開源社區蓬勃發展,先后出現了Hadoop,Spark,Apache Flink等產品,而Google內部則使用着閉源的BigTable、Spanner、Millwheel,但他們殊途同歸,這次Google沒有發一篇論文后便銷聲匿跡,而是高調的開源了Beam,所謂“一流公司定標准”,開源的好處也是相當的多,如提高公司影響力,集思廣益,共同維護等。

 

二 Beam優勢

1 統一

Beam提供統一的編程模型,編程指導可以參考官方的programming-guide ,通過quickstart-javawordcount-example入門。

2 可移植

編程人員coding的時候基本不需要關注將來代碼會運行到Spark、Flink或者其他計算平台上,coding完成后再通過命令行選擇計算平台。

3 可擴展

如在2 可移植中所寫,已經展示了DirectRunner和SparkRunner,在Beam中,Runner,IO鏈接器,轉換操作庫,甚至SDK都是可以自己定制的,具有高度的擴展性。

4 支持批處理和流處理

不管將來編程人員用Beam寫的程序是用於批處理還是流處理,用於有限的數據集還是無限的數據集,用Beam寫的程序都可以不修改的執行。(Beam通過引入triggering,windows的概念來解決這個問題)

5 高度抽象

Beam用DAG(directed acyclic graph)進行了高度的抽象,編程人員不需要將其代碼強制構造成Map-Shuffle-Reduce的形式,可以直接進行更高級別的操作,如counting, joining, projecting。

6 多語言支持

目前官方支持了Java和Python,后面會有更多語言的SDK開發出來

 

三 Beam構成

先來一個整體框架圖

 

1 Beam編程模型

Beam的編程模型是Google的工程師從MapReduceFlumeJava, 和Millwheel等多個大數據處理項目中抽象出來的,如果想詳細了解可以參考相關的報考和論文,Streaming 101Streaming 102 和VLDB 2015 paper.。這個編程模型主要包括如下幾個核心概念:

  1. PCollection:數據集,代表了將要被處理的數據集合,可以是有限的數據集,也可以是無限的數據流。
  2. PTransform:計算過程,代表了將輸入數據集處理成輸出數據集中間的計算過程,
  3. Pipeline:管道,代表了處理數據的執行任務,可視作一個有向無環圖(DAG),PCollections是節點,Transforms是邊。
  4. PipelineRunner:執行器,指定了Pipeline將要在哪里,怎樣的運行。

其中PTransform還包括很多操作,如:

  • ParDo:通用的並行處理的PTranform, 相當於Map/Shuffle/Reduce-style 中的Map,可用於過濾 、類型轉換 、抽取部分數據 、 對數據中的每一個元素做計算等
  • GroupByKey:用來聚合key/value對,相當於Map/Shuffle/Reduce-style中的Shuffle, 聚合那些擁有同一個key的value
  • CoGroupByKey:用來聚合多個集合,功能和GroupByKey類似
  • Combine:處理集合里的數據,如sum, min, and max(sdk預定義),也可以自建新類
  • Flatten:用來把多個數據集合並成一個數據集
  • Partition:用來把一個數據集分割成多個小數據集

此外還有一些核心概念,如:

  • Windowing:把PCollections數據集中元素通過時間戳分成多個子集
  • Watermark:標記了多久時間后的延遲數據直接拋棄
  • Triggers:用來決定什么時候發送每個window的聚合結果

Beam的編程模型可簡單概括為

[Output PCollection] = [Input PCollection].apply([Transform])

Google工程師還把做Beam編程時的場景抽象成四個問題,就是WWWH

  • 即做什么計算,對應的抽象概念為PTransform

  • 即在哪個時間范圍內計算,對應的抽象概念為Window

  • 即在何時輸出計算結果,對應的抽象概念為Watermarks和Triggers

  • 即怎么提取相關的數據,對應的抽象概念為Accumulation

 備注:此處的翻譯是參考Streaming 102得來的,可能單純按照字面翻譯並不能達到預期的效果,如有不合適的地方歡迎指正。 

 

2 SDK

Beam支持用多種語言的SDK來構造Pipeline,當前已經支持Java和Python,相對而言,對Java的SDK支持會更好一些。

 

3 Runner

Beam支持將Pipeline運行在多個分布式后端,目前支持如下的PipelineRunners:

  • DirectRunner: 在本地執行Pipeline
  • ApexRunner:在Yarn集群(或者用embeded模式)上運行Pipeline
  • DataflowRunner:在Google Cloud Dataflow上運行Pipleine
  • FlinkRunner:在Flink集群上運行Pipeline
  • SparkRunner:在Spark集群上運行Pipeline
  • MRRunner:目前在Beam的github主分支上還沒提供,不過有mr-runner分支,具體還可參考BEAM-165

 

四 例子

通過官方的wordcount的例子來實際體驗下Beam,詳細可參考quickstart-javawordcount-example

1 獲取相關代碼

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false

2 相關文件

$ cd word-count-beam/

$ ls
pom.xml    src

$ ls src/main/java/org/apache/beam/examples/
DebuggingWordCount.java    WindowedWordCount.java    common
MinimalWordCount.java    WordCount.java

 3 使用DrectRunner執行

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner -Xdebug 

4 提交到Spark

方式1

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner  

方式2

spark-submit --class org.apache.beam.examples.WordCount --master local target/word-count-beam-bundled-0.1.jar --runner=SparkRunner --inputFile=pom.xml --output=counts

方式3

spark-submit --class org.apache.beam.examples.WordCount --master yarn --deploy-mode cluster word-count-beam-bundled-0.1.jar --runner=SparkRunner --inputFile=/home/yarn/software/java/LICENSE   --output=/tmp/counts 

SparkRunner詳情參考這里,其中方式3讀取HDFS文件的時候會有些問題,這個問題我們在這里會講到,上面的例子里可以寫物理機上實際存在的文件,這樣可以保證相關程序正常運行。

 

五 參考資料

編程指南 https://beam.apache.org/documentation/programming-guide
例子 https://beam.apache.org/get-started/wordcount-example/
Javadoc https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
streaming-101 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
streaming-102 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM