文章作者:luxianghao
文章來源:http://www.cnblogs.com/luxianghao/p/9010748.html 轉載請注明,謝謝合作。
免責聲明:文章內容僅代表個人觀點,如有不當,歡迎指正。
---
一 引言
2016年2月Google宣布將Beam(原名Google DataFlow)貢獻給Apache基金會孵化,成為Apache的一個頂級開源項目。
Beam是一個統一的編程框架,支持批處理和流處理,並可以將用Beam編程模型構造出來的程序,在多個計算引擎(Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow等)上運行。
大數據起源於Google 2003年發布的三篇論文GoogleFS、MapReduce、BigTable,史稱三駕馬車,可惜Google在發布論文后並沒有公布其源碼,但是Apache開源社區蓬勃發展,先后出現了Hadoop,Spark,Apache Flink等產品,而Google內部則使用着閉源的BigTable、Spanner、Millwheel,但他們殊途同歸,這次Google沒有發一篇論文后便銷聲匿跡,而是高調的開源了Beam,所謂“一流公司定標准”,開源的好處也是相當的多,如提高公司影響力,集思廣益,共同維護等。
二 Beam優勢
1 統一
Beam提供統一的編程模型,編程指導可以參考官方的programming-guide ,通過quickstart-java和wordcount-example入門。
2 可移植
編程人員coding的時候基本不需要關注將來代碼會運行到Spark、Flink或者其他計算平台上,coding完成后再通過命令行選擇計算平台。
3 可擴展
如在2 可移植中所寫,已經展示了DirectRunner和SparkRunner,在Beam中,Runner,IO鏈接器,轉換操作庫,甚至SDK都是可以自己定制的,具有高度的擴展性。
4 支持批處理和流處理
不管將來編程人員用Beam寫的程序是用於批處理還是流處理,用於有限的數據集還是無限的數據集,用Beam寫的程序都可以不修改的執行。(Beam通過引入triggering,windows的概念來解決這個問題)
5 高度抽象
Beam用DAG(directed acyclic graph)進行了高度的抽象,編程人員不需要將其代碼強制構造成Map-Shuffle-Reduce的形式,可以直接進行更高級別的操作,如counting, joining, projecting。
6 多語言支持
目前官方支持了Java和Python,后面會有更多語言的SDK開發出來
三 Beam構成
先來一個整體框架圖
1 Beam編程模型
Beam的編程模型是Google的工程師從MapReduce, FlumeJava, 和Millwheel等多個大數據處理項目中抽象出來的,如果想詳細了解可以參考相關的報考和論文,Streaming 101,Streaming 102 和VLDB 2015 paper.。這個編程模型主要包括如下幾個核心概念:
- PCollection:數據集,代表了將要被處理的數據集合,可以是有限的數據集,也可以是無限的數據流。
- PTransform:計算過程,代表了將輸入數據集處理成輸出數據集中間的計算過程,
- Pipeline:管道,代表了處理數據的執行任務,可視作一個有向無環圖(DAG),PCollections是節點,Transforms是邊。
- PipelineRunner:執行器,指定了Pipeline將要在哪里,怎樣的運行。
其中PTransform還包括很多操作,如:
- ParDo:通用的並行處理的PTranform, 相當於Map/Shuffle/Reduce-style 中的Map,可用於過濾 、類型轉換 、抽取部分數據 、 對數據中的每一個元素做計算等
- GroupByKey:用來聚合key/value對,相當於Map/Shuffle/Reduce-style中的Shuffle, 聚合那些擁有同一個key的value
- CoGroupByKey:用來聚合多個集合,功能和GroupByKey類似
- Combine:處理集合里的數據,如sum, min, and max(sdk預定義),也可以自建新類
- Flatten:用來把多個數據集合並成一個數據集
- Partition:用來把一個數據集分割成多個小數據集
此外還有一些核心概念,如:
- Windowing:把PCollections數據集中元素通過時間戳分成多個子集
- Watermark:標記了多久時間后的延遲數據直接拋棄
- Triggers:用來決定什么時候發送每個window的聚合結果
Beam的編程模型可簡單概括為
[Output PCollection] = [Input PCollection].apply([Transform])
Google工程師還把做Beam編程時的場景抽象成四個問題,就是WWWH
即做什么計算,對應的抽象概念為PTransform
即在哪個時間范圍內計算,對應的抽象概念為Window
即在何時輸出計算結果,對應的抽象概念為Watermarks和Triggers
即怎么提取相關的數據,對應的抽象概念為Accumulation
備注:此處的翻譯是參考Streaming 102得來的,可能單純按照字面翻譯並不能達到預期的效果,如有不合適的地方歡迎指正。
2 SDK
Beam支持用多種語言的SDK來構造Pipeline,當前已經支持Java和Python,相對而言,對Java的SDK支持會更好一些。
3 Runner
Beam支持將Pipeline運行在多個分布式后端,目前支持如下的PipelineRunners:
- DirectRunner: 在本地執行Pipeline
- ApexRunner:在Yarn集群(或者用embeded模式)上運行Pipeline
- DataflowRunner:在Google Cloud Dataflow上運行Pipleine
- FlinkRunner:在Flink集群上運行Pipeline
- SparkRunner:在Spark集群上運行Pipeline
- MRRunner:目前在Beam的github主分支上還沒提供,不過有mr-runner分支,具體還可參考BEAM-165
四 例子
通過官方的wordcount的例子來實際體驗下Beam,詳細可參考quickstart-java和wordcount-example。
1 獲取相關代碼
mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.1.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
2 相關文件
$ cd word-count-beam/ $ ls pom.xml src $ ls src/main/java/org/apache/beam/examples/ DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
3 使用DrectRunner執行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner -Xdebug
4 提交到Spark
方式1
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
方式2
spark-submit --class org.apache.beam.examples.WordCount --master local target/word-count-beam-bundled-0.1.jar --runner=SparkRunner --inputFile=pom.xml --output=counts
方式3
spark-submit --class org.apache.beam.examples.WordCount --master yarn --deploy-mode cluster word-count-beam-bundled-0.1.jar --runner=SparkRunner --inputFile=/home/yarn/software/java/LICENSE --output=/tmp/counts
SparkRunner詳情參考這里,其中方式3讀取HDFS文件的時候會有些問題,這個問題我們在這里會講到,上面的例子里可以寫物理機上實際存在的文件,這樣可以保證相關程序正常運行。
五 參考資料
編程指南 https://beam.apache.org/documentation/programming-guide
例子 https://beam.apache.org/get-started/wordcount-example/
Javadoc https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
streaming-101 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
streaming-102 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102