Apache Beam:一個開源的統一的分布式數據處理編程庫
Apache Beam是一個開源的數據處理編程庫,由Google貢獻給Apache的項目,前不久剛剛成為Apache TLP項目。它提供了一個高級的、統一的編程模型,允許我們通過構建Pipeline的方式實現批量、流數據處理,並且構建好的Pipeline能夠運行在底層不同的執行引擎上。剛剛接觸該開源項目時,我的第一感覺就是:在編程API的設計上,數據集及其操作的抽象有點類似Apache Crunch(MapReduce Pipeline編程庫)項目;而在支持統一數據處理模型上,能夠讓人想到Apache Flink項目。如果深入了解Apache Beam,你會發現未來Apache Beam很可能成為數據處理領域唯一一個能夠將不同的數據應用統一起來的編程庫。
Apache Beam架構概覽
Apache Beam目前最新版本為0.5.0-SNAPSHOT,最新的Release版本為0.4.0,很多特性還在開發中。在網上找到一個由Andrew Psaltis在2016年6月份演講的《Apache Beam: The Case for Unifying Streaming API’s》,引用了其中一個Apache Beam的架構圖,如下圖所示:
上圖中,我們可以看到,Apache Beam核心的主要有兩層:
- Pipeline構建層
在Pipeline構建層,針對不同的編程語言,構建一組用於定義Pipeline相關抽象,提供編程API,這一層被稱為Beam SDKs。最終的用戶(具有不同編程語言技能的人員)可以基於這些抽象的Beam SDK來構建數據處理Pipeline。
- Runner適配層
Runner適配層,主要是用來對接底層的計算引擎,用來執行上層用戶開發好的Pipeline程序。
我們先根據官網文檔,了解一下Apache Beam的Roadmap。首先,下面的三個特性,或者說是Apache Beam的目標:
- 統一(UNIFIED)
基於單一的編程模型,能夠實現批處理(Batch processing)、流處理(Streaming Processing),通常的做法是把待處理的數據集(Dataset)統一,一般會把有界(Bound)數據集作為無界(Unbound)數據集的一種特殊情況來看待,比如Apache Flink便是按照這種方式處理,在差異化的API層之上構建一個統一的API層。
- 可移植(PORTABLE)
在多個不同的計算環境下,都能夠執行已經定義好的數據處理Pipeline。也就是說,對數據集處理的定義(即構建的Data Pipeline),與最終所要Deploy的執行環境完全無關。這對實現數據處理的企業是非常友好的,當下數據處理新技術不斷涌現,企業數據處理平台也為了能夠與時俱進並提高處理效率,當然希望在底層計算平台升級的過程中無需重寫上層已定義的Data Pipeline。
目前,Apache Beam項目開發整體來看還處在初期,初步決定底層執行環境支持主流的計算平台:Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow。實際上,Apache Beam的這種統一編程模型,可以支持任意的計算引擎,通過Data Pipeline層與執行引擎層之間開發一個類似Driver的連接器即可實現。
- 可擴展(EXTENSIBLE)
實現任意可以共享的Beam SDK、IO connector、Transform庫。
基本概念
在使用Apache Beam構建數據處理程序,首先需要使用Beam SDK中的類創建一個Driver程序,在Driver程序中創建一個滿足我們數據處理需求的Pipeline,Pipeline中包括輸入(Inputs)、轉換(Transformations)、輸出(Outputs)三個核心的組件。然后,根據我們選擇的Beam SDK來確定底層使用Pipeline Runner(執行引擎,或計算引擎),將我們定義好的Pipeline運行在Pipeline Runner上。
Apache Beam SDKs提供一組抽象,用來簡化大規模分布式數據處理。同一個Beam抽象,能夠同時適應批量處理、流處理兩種數據源。下面,我們了解一下Apache Beam的一些關鍵抽象:
- Pipeline
一個Pipeline是對一個數據處理任務抽象,它包含了我們在對給定數據集處理的全部邏輯,主要包括從數據源讀取數據(可能從多個數據源讀取)、在給定的數據集上執行Transform操作(中間可能是一個DAG圖,通過多個Transform連接,而Transform的輸出和輸出都可能是一個數據集)、將Transform的數據結果寫入到指定對的存儲系統中。
- PCollection
一個PCollection是對分布式數據集的抽象,他可以是輸入數據集、中間結果數據集、輸出數據集。每一個由PCollection表征的數據集作為輸入時,都會存在一個或多個Transform作用在其上(對數據集進行處理的邏輯)。
- Transform
一個Transform表示數據處理過程中一個步驟(Step),對應於Pipeline中一個操作,每一個Transform會以一個或多個PCollection作為輸入,經過處理后輸出一個或多個PCollection。
- Source and Sink
Apache Beam提供了Source和Sink的API,用來表示讀取和寫入數據。Source表示從一個外部的數據源讀入數據到Pipeline,而Sink表示經過Pipeline處理后將數據寫入到外部存儲系統
- PipelineRunner
PipelineRunner是實際用來處理Pipeline邏輯的底層組件,它能夠將用戶構建的Pipeline翻譯成底層計算引擎能夠處理的Job,並執行Pipeline的處理邏輯。
API設計
Apache Beam還在開發之中,后續對應的API設計可能會有所變化,不過從當前版本來看,基於對數據處理領域對象的抽象,API的設計風格大量使用泛型來定義,具有很高的抽象級別。下面我們分別對感興趣的的設計來詳細說明。
- Source
Source表示數據輸入的抽象,在API定義上分成兩大類:一類是面向數據批處理的,稱為BoundedSource,它能夠從輸入的數據集讀取有限的數據記錄,知道數據具有有限性的特點,從而能夠對輸入數據進行切分,分成一定大小的分片,進而實現數據的並行處理;另一類是面向數據流處理的,稱為UnboundedSource,它所表示的數據是連續不斷地進行輸入,從而能夠實現支持流式數據所特有的一些操作,如Checkpointing、Watermarks等。
Source對應的類設計,如下類圖所示:
目前,Apache Beam支持BoundedSource的數據源主要有:HDFS、MongoDB、Elasticsearch、File等,支持UnboundedSource的數據源主要有:Kinesis、Pubsub、Socker等。未來,任何具有Bounded或Unbounded兩類特性的數據源都可以在Apache Beam的抽象基礎上實現對應的Source。
- Sink
Sink表示任何經過Pipeline中一個或多個PTransform處理過的PCollection,最終會輸出到特定的存儲中。與Source對應,其實Sink主要也是具有兩種類型:一種是直接寫入特定存儲的Bounded類型,如文件系統;另一種是寫入具有Unbounded特性的存儲或系統中,如Flink。在API設計上,Sink的類圖如下所示:
可見,基於Sink的抽象,可以實現任意可以寫入的存儲系統。
- PipelineRunner
下面,我們來看一下PipelineRunner的類設計以及目前開發中的PipelineRunner,如下圖所示:
目前,PipelineRunner有DirectRunner、DataflowRunner、SparkRunner、ApexRunner、FlinkRunner,待這些主流的PipelineRunner穩定以后,如果有其他新的計算引擎框架出現,可以在PipelineRunner這一層進行擴展實現。
這些PipelineRunner中,DirectRunner是最簡單的PipelineRunner,它非常有用,比如我們實現了一個從HDFS讀取數據,但是需要在Spark集群上運行的ETL程序,使用DirectRunner可以在本地非常容易地調試ETL程序,調試到程序的數據處理邏輯沒有問題了,再最終在實際的生產環境Spark集群上運行。如果特定的PipelineRunner所對應的計算引擎沒有很好的支撐調試功能,使用DirectRunner是非常方便的。
- PCollection
PCollection是對分布式數據集的抽象,主要用作輸入、輸出、中間結果集。其中,在Apache Beam中對數據及其數據集的抽象有幾類,我們畫到一張類圖上,如下圖所示:
PCollection是對數據集的抽象,包括輸入輸出,而基於Window的數據處理有對應的Window相關的抽象,還有一類就是TupleTag,針對具有CoGroup操作的情況下用來標記對應數據中的Tuple數據,具體如何使用可以后面我們實現的Join的例子。
- PTransform
一個Pipeline是由一個或多個PTransform構建而成的DAG圖,其中每一個PTransform都具有輸入和輸出,所以PTransform是Apache Beam中非常核心的組件,我按照PTransform的做了一下分類,如下類圖所示:
通過上圖可以看出,PTransform針對不同輸入或輸出的數據的特征,實現了一個算子(Operator)的集合,而Apache Beam除了期望實現一些通用的PTransform實現來供數據處理的開發人員開箱即用,同時也在API的抽象級別上做的非常Open,如果你想實現自己的PTransform來處理指定數據集,只需要自定義即可。而且,隨着社區的活躍及其在實際應用場景中推廣和使用,會很快構建一個龐大的PTransform實現庫,任何有數據處理需求的開發人員都可以共享這些組件。
- Combine
這里,單獨把Combine這類合並數據集的實現拿出來,它的抽象很有趣,主要面向globally 和per-key這兩類抽象,實現了一個非常豐富的PTransform算子庫,對應的類圖如下所示:
通過上圖可以看出,作用在一個數據集上具有Combine特征的基本操作:Max、Min、Top、Mean、Sum、Count等等。
- Window
Window是用來處理某一個Micro batch的數據記錄可以進行Merge這種場景的需求,通常用在Streaming處理的情況下。Apache Beam也提供了對Window的抽象,其中對於某一個Window下的數據的處理,是通過WindowFn接口來定義的,與該接口相關的處理類,如下類圖所示:
編程實戰
首先說明一下,為了簡單起見,我直接在代碼中顯式配置指定PipelineRunner,示例代碼片段如下所示:
1
2
|
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.
class
);
|
如果要部署到服務器上,可以通過命令行的方式指定PipelineRunner,比如要在Spark集群上運行,類似如下所示命令行:
1
|
spark-submit --class org.shirdrn.beam.examples.MinimalWordCountBasedSparkRunner 2017-01-18 --master spark:
//myserver
:7077 target
/my-beam-apps-0
.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner
|
下面,我們從幾個典型的例子來看(基於Apache Beam軟件包的examples有所改動),Apache Beam如何構建Pipeline並運行在指定的PipelineRunner上:
- WordCount(Count/Source/Sink)
我們根據Apache Beam的MinimalWordCount示例代碼開始,看如何構建一個Pipeline,並最終執行它。 MinimalWordCount的實現,代碼如下所示:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package
org.shirdrn.beam.examples;
import
org.apache.beam.runners.direct.DirectRunner;
import
org.apache.beam.sdk.Pipeline;
import
org.apache.beam.sdk.io.TextIO;
import
org.apache.beam.sdk.options.PipelineOptions;
import
org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.sdk.transforms.Count;
import
org.apache.beam.sdk.transforms.DoFn;
import
org.apache.beam.sdk.transforms.MapElements;
import
org.apache.beam.sdk.transforms.ParDo;
import
org.apache.beam.sdk.transforms.SimpleFunction;
import
org.apache.beam.sdk.values.KV;
public
class
MinimalWordCount {
@SuppressWarnings
(
"serial"
)
public
static
void
main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.
class
);
// 顯式指定PipelineRunner:DirectRunner(Local模式)
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from(
"/tmp/dataset/apache_beam.txt"
))
// 讀取本地文件,構建第一個PTransform
.apply(
"ExtractWords"
, ParDo.of(
new
DoFn<String, String>() {
// 對文件中每一行進行處理(實際上Split)
@ProcessElement
public
void
processElement(ProcessContext c) {
for
(String word : c.element().split(
"[\\s:\\,\\.\\-]+"
)) {
if
(!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String> perElement())
// 統計每一個Word的Count
.apply(
"ConcatResultKVs"
, MapElements.via(
// 拼接最后的格式化輸出(Key為Word,Value為Count)
new
SimpleFunction<KV<String, Long>, String>() {
@Override
public
String apply(KV<String, Long> input) {
return
input.getKey() +
": "
+ input.getValue();
}
}))
.apply(TextIO.Write.to(
"wordcount"
));
// 輸出結果
pipeline.run().waitUntilFinish();
}
}
|
Pipeline的具體含義,可以看上面代碼的注釋信息。下面,我們考慮以HDFS數據源作為Source,如何構建第一個PTransform,代碼片段如下所示:
1
2
3
|
PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom(
TextInputFormat.
class
, LongWritable.
class
, Text.
class
))
|
可以看到,返回的是具有鍵值分別為LongWritable、Text類型的KV對象集合,后續處理和上面處理邏輯類似。如果使用Maven構建Project,需要加上如下依賴(這里beam.version的值可以為最新Release版本0.4.0):
1
2
3
4
5
|
<
dependency
>
<
groupId
>org.apache.beam</
groupId
>
<
artifactId
>beam-sdks-java-io-hdfs</
artifactId
>
<
version
>${beam.version}</
version
>
</
dependency
>
|
- 去重(Distinct)
去重也是對數據集比較常見的操作,使用Apache Beam來實現,示例代碼如下所示:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
org.shirdrn.beam.examples;
import
org.apache.beam.runners.direct.DirectRunner;
import
org.apache.beam.sdk.Pipeline;
import
org.apache.beam.sdk.io.TextIO;
import
org.apache.beam.sdk.options.PipelineOptions;
import
org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.sdk.transforms.Distinct;
public
class
DistinctExample {
public
static
void
main(String[] args)
throws
Exception {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.
class
);
// 顯式指定PipelineRunner:DirectRunner(Local模式)
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from(
"/tmp/dataset/MY_ID_FILE.txt"
))
.apply(Distinct.<String> create())
// 創建一個處理String類型的PTransform:Distinct
.apply(TextIO.Write.to(
"deduped.txt"
));
// 輸出結果
pipeline.run().waitUntilFinish();
}
}
|
- 分組(GroupByKey)
對數據進行分組操作也非常普遍,我們拿一個最基礎的PTransform實現GroupByKey來實現一個例子,代碼如下所示:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package
org.shirdrn.beam.examples;
import
org.apache.beam.runners.direct.DirectRunner;
import
org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner;
import
org.apache.beam.sdk.Pipeline;
import
org.apache.beam.sdk.io.TextIO;
import
org.apache.beam.sdk.options.PipelineOptions;
import
org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.sdk.transforms.DoFn;
import
org.apache.beam.sdk.transforms.GroupByKey;
import
org.apache.beam.sdk.transforms.MapElements;
import
org.apache.beam.sdk.transforms.ParDo;
import
org.apache.beam.sdk.transforms.SimpleFunction;
import
org.apache.beam.sdk.values.KV;
public
class
GroupByKeyExample {
@SuppressWarnings
(
"serial"
)
public
static
void
main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.
class
);
// 顯式指定PipelineRunner:DirectRunner(Local模式)
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from(
"/tmp/dataset/MY_INFO_FILE.txt"
))
.apply(
"ExtractFields"
, ParDo.of(
new
DoFn<String, KV<String, String>>() {
@ProcessElement
public
void
processElement(ProcessContext c) {
// file format example: 35451605324179 3G CMCC
String[] values = c.element().split(
"\t"
);
if
(values.length ==
3
) {
c.output(KV.of(values[
1
], values[
0
]));
}
}
}))
.apply(
"GroupByKey"
, GroupByKey.<String, String>create())
// 創建一個GroupByKey實例的PTransform
.apply(
"ConcatResults"
, MapElements.via(
new
SimpleFunction<KV<String, Iterable<String>>, String>() {
@Override
public
String apply(KV<String, Iterable<String>> input) {
return
new
StringBuffer()
.append(input.getKey()).append(
"\t"
)
.append(Joiner.on(
","
).join(input.getValue()))
.toString();
}
}))
.apply(TextIO.Write.to(
"grouppedResults"
));
pipeline.run().waitUntilFinish();
}
}
|
使用DirectRunner運行,輸出文件名稱類似於grouppedResults-00000-of-00002、grouppedResults-00001-of-00002等等。
- 連接(Join)
最后,我們通過實現一個Join的例子,其中,用戶的基本信息包含ID和名稱,對應文件格式如下所示:
1
2
3
4
|
35451605324179 Jack
35236905298306 Jim
35236905519469 John
35237005022314 Linda
|
另一個文件是用戶使用手機的部分信息,文件格式如下所示:
1
2
3
|
35451605324179 3G 中國移動
35236905298306 2G 中國電信
35236905519469 4G 中國移動
|
我們希望通過Join操作后,能夠知道用戶使用的什么網絡(用戶名+網絡),使用Apache Beam實現,具體實現代碼如下所示:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
package
org.shirdrn.beam.examples;
import
org.apache.beam.runners.direct.DirectRunner;
import
org.apache.beam.sdk.Pipeline;
import
org.apache.beam.sdk.io.TextIO;
import
org.apache.beam.sdk.options.PipelineOptions;
import
org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.sdk.transforms.DoFn;
import
org.apache.beam.sdk.transforms.MapElements;
import
org.apache.beam.sdk.transforms.ParDo;
import
org.apache.beam.sdk.transforms.SimpleFunction;
import
org.apache.beam.sdk.transforms.join.CoGbkResult;
import
org.apache.beam.sdk.transforms.join.CoGroupByKey;
import
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import
org.apache.beam.sdk.values.KV;
import
org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.sdk.values.TupleTag;
public
class
JoinExample {
@SuppressWarnings
(
"serial"
)
public
static
void
main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.
class
);
// 顯式指定PipelineRunner:DirectRunner(Local模式)
Pipeline pipeline = Pipeline.create(options);
// create ID info collection
final
PCollection<KV<String, String>> idInfoCollection = pipeline
.apply(TextIO.Read.from(
"/tmp/dataset/MY_ID_INFO_FILE.txt"
))
.apply(
"CreateUserIdInfoPairs"
, MapElements.via(
new
SimpleFunction<String, KV<String, String>>() {
@Override
public
KV<String, String> apply(String input) {
// line format example: 35451605324179 Jack
String[] values = input.split(
"\t"
);
return
KV.of(values[
0
], values[
1
]);
}
}));
// create operation collection
final
PCollection<KV<String, String>> opCollection = pipeline
.apply(TextIO.Read.from(
"/tmp/dataset/MY_ID_OP_INFO_FILE.txt"
))
.apply(
"CreateIdOperationPairs"
, MapElements.via(
new
SimpleFunction<String, KV<String, String>>() {
@Override
public
KV<String, String> apply(String input) {
// line format example: 35237005342309 3G CMCC
String[] values = input.split(
"\t"
);
return
KV.of(values[
0
], values[
1
]);
}
}));
final
TupleTag<String> idInfoTag =
new
TupleTag<String>();
final
TupleTag<String> opInfoTag =
new
TupleTag<String>();
final
PCollection<KV<String, CoGbkResult>> cogrouppedCollection = KeyedPCollectionTuple
.of(idInfoTag, idInfoCollection)
.and(opInfoTag, opCollection)
.apply(CoGroupByKey.<String>create());
final
PCollection<KV<String, String>> finalResultCollection = cogrouppedCollection
.apply(
"CreateJoinedIdInfoPairs"
, ParDo.of(
new
DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@ProcessElement
public
void
processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String id = e.getKey();
String name = e.getValue().getOnly(idInfoTag);
for
(String opInfo : c.element().getValue().getAll(opInfoTag)) {
// Generate a string that combines information from both collection values
c.output(KV.of(id,
"\t"
+ name +
"\t"
+ opInfo));
}
}
}));
PCollection<String> formattedResults = finalResultCollection
.apply(
"FormatFinalResults"
, ParDo.of(
new
DoFn<KV<String, String>, String>() {
@ProcessElement
public
void
processElement(ProcessContext c) {
c.output(c.element().getKey() +
"\t"
+ c.element().getValue());
}
}));
formattedResults.apply(TextIO.Write.to(
"joinedResults"
));
pipeline.run().waitUntilFinish();
}
}
|
參考內容