Apache Beam實戰指南 | 大數據管道(pipeline)設計及實踐




 

 

策划 & 審校 | Natalie作者 | 張海濤編輯 | Linda AI 前線導讀: 本文是 Apache Beam 實戰指南系列文章第五篇內容,將對 Beam 框架中的 pipeline 管道進行剖析,並結合應用示例介紹如何設計和應用 Beam 管道。系列文章第一篇回顧  Apache Beam 實戰指南 | 基礎入門、第二篇回顧  Apache Beam 實戰指南 | 玩轉 KafkaIO 與 Flink、第三篇回顧  Apache Beam 實戰指南 | 玩轉大數據存儲 HdfsIO、第四篇回顧 A pache Beam 實戰指南 | 如何結合 ClickHouse 打造“AI 微服務”?

更多優質內容請關注微信公眾號“AI 前線”(ID:ai-front)

關於 Apache Beam 實戰指南系列文章

隨着大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者經常要用到不同的技術、框架、API、開發語言和 SDK 來應對復雜應用的開發,這大大增加了選擇合適工具和框架的難度,開發者想要將所有的大數據組件熟練運用幾乎是一項不可能完成的任務。

面對這種情況,Google 在 2016 年 2 月宣布將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣布開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺乏了解,社區中文資料也比較少。InfoQ 期望通過 Apache Beam 實戰指南系列文章 推動 Apache Beam 在國內的普及。

一.概述

其他行業問咱們 IT 具體干什么的,很多 IT 人員會自嘲自己就是“搬磚”(此處將復制代碼稱為搬磚)的民工。過了兩天 GitHub 出現自動寫代碼的人工智能,IT 程序員深深嘆了一口氣說道“完了要失業了,代碼沒得搬了”。其實從入行 IT 那一刻起,不管我們做前端、服務端、底層架構等任何崗位,其實我們都是為數據服務的服務人員(注:不是說從民工轉崗到服務員了):把數據從后端搬到前端,把前端數據再寫入數據庫。盡管編程語言從 C、C++、C#、JAVA、Python 不停變化,為了適應時代背景框架也是千變萬化,我們拼命從“亞馬遜熱帶雨林”一直學到“地中海”。

然后 Apache Beam 這個一統“地中海”的框架出現了。Apache Beam 不光統一了數據源,還統一了流批計算。在這個數據傳輸過程中有一條核心的技術就是管道(Pipeline),不管是 Strom,Flink ,Beam 它都是核心。在這條管道中可以對數據進行過濾、凈化、清洗、合並、分流以及各種實時計算操作。

本文會詳細介紹如何設計 Apache Beam 管道、管道設計工具介紹、源碼和案例分析,普及和提升大家對 Apache Beam 管道的認知。

二.怎樣設計好自己的管道?設計管道注意事項

圖 2-1 簡單管道

1. 你輸入的數據存儲在那里?

首先要確定你要構造幾條數據源,在 Beam 可以構建多條,構建之前可以選擇自己的 SDK 的  IO。

2. 你的數據類型是什么樣的?

Beam 提供的是鍵值對的數據類型,你的數據可能是日志文本、格式化設備事件、數據庫的行,所以在 PCollection 就應該確定數據集的類型。

3. 你想怎么處理數據?

對數據進行轉換、過濾處理、窗口計算、SQL 處理等。在管道中提供了通用的 ParDo 轉換類,算子計算以及 BeamSQL 等操作。

4. 你打算把數據最后輸出到哪里去?

在管道末尾進行 Write 寫入操作,把數據最后寫入你自己想存放或最后流向的地方。

管道的幾種玩法1. 分支管道:多次轉換,處理相同的數據集

圖 2-2-1 多次轉換處理相同數據示意圖

描述:例如上圖 2-1-1  圖所示,從一個數據庫的表讀取或轉換數據集,然后從數據集中分別找找以字母“A”開頭的數據放入一個分支數據集中,如果以字母“B”開頭的數據放入另一個分支數據集中,最終兩個數據集進行隔離處理。

數據集

// 為了演示顯示內存數據集
final List<String> LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");

示例代碼:

PCollection<String> dbRowCollection = ...;// 這個地方可以讀取任何數據源。
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以"A"開頭的數據
c.output(c.element());
System.out.append("A 開頭的單詞有:"+c.element()+"\r");
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以"A"開頭的數據
c.output(c.element());
System.out.append("B 開頭的單詞有:"+c.element()+"\r");
}
}
}));

最終結果展示:

A 開頭的單詞有:Aggressive
B 開頭的單詞有:Bold
A 開頭的單詞有:Apprehensive
B 開頭的單詞有:Brilliant

原示例代碼地址 :pipelineTest2_1

2. 分支管道:一次轉換,輸出多個數據集

圖 2-2-2  一次轉換多個輸出示意圖

描述:根據圖 2-2-1 和圖 2-2-2 圖中可以看出,他們以不同的方式執行着相同的操作,圖 2-2-1 中的管道包含兩個轉換,用於處理同一輸入中的元素 PCollection。一個轉換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA}

另一個轉換為

if(以'B'開頭){outputToPCollectionB}

因為每個轉換讀取整個輸入 PCollection,所以輸入中的每個元素都會 PCollection 被處理兩次。圖 2-2-2 中的管道以不同的方式執行相同的操作 - 只有一個轉換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA} else if(以'B'開頭){outputToPCollectionB}

其中輸入中的每個元素都 PCollection 被處理一次。

數據集:同 2-1-1 數據集

示例代碼:

// 定義兩個 TupleTag,每個輸出一個。
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母帶有"A"的數據集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母帶有"B"的數據集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

如果每個元素的轉換計算非常耗時,則使用其他輸出會更有意義,因為一次性過濾全部數據,比全部數據過濾兩次從性能上和轉換上都存在一定程度上提升,數據量越大越明顯。

最終結果展示:

A 開頭的單詞有:Apprehensive
A 開頭的單詞有:Aggressive
B 開頭的單詞有:Brilliant
B 開頭的單詞有:Bold

原示例代碼地址 :pipelineTest2_2

3. 合並管道:多個數據集,合並成一個管道輸出

圖 2-2-3 多數據集合並輸出圖

描述:

上圖 2-2-3 是接圖 2-2-1 的繼續,把帶“A” 的數據和帶“B” 字母開頭的數據進行合並到一個管道。這個地方注意點是 Flatten 用法必須兩個數據的數據類型相同。

數據集:

// 為了演示顯示內存數據集
final List<String> LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List<String> LINESb = Arrays.asList(
"Bold",
"Brilliant");

示例代碼:

// 將兩個 PCollections 與 Flatten 合並
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// 繼續合並新的 PCollection
mergedCollectionWithFlatten.apply(...);

結果展示:

合並單詞單詞有:
Aggressive
Brilliant
Apprehensive
Bold

原示例代碼地址 :pipelineTest2_3

4. 合並管道:多個數據源,鏈接合並一個管道輸出

圖 2-2-4 多數據源合並輸出圖

描述:

你的管道可以從一個或多個源讀取或輸入。如果你的管道從多個源讀取並且這些源中的數據相關聯,則將輸入連接在一起會很有用。在上面的圖 2-2-4 所示的示例中,管道從數據庫表中讀取名稱和地址,並從 Kafka 主題中讀取名稱和訂單號。然后管道 CoGroupByKey 用於連接此信息,其中鍵是名稱 ; 結果 PCollection 包含名稱,地址和訂單的所有組合。

示例代碼:

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// 將集合值合並到 CoGbkResult 集合中。
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
管道的設計工具

對於管道的設計不光用代碼去實現,也可以用視圖工具。現在存在的有兩種一種是拓藍公司出品叫 Talend Big Data Studio,另一種就是免費開源的視圖設計工具 kettle-beam。

 

 

三.怎樣創建你的管道

Apache Beam 程序從頭到尾就是處理數據的管道。本小節使用 Apache Beam SDK 中的類構建管道,一個完整的 Apache Beam 管道構建流程如下: 

  1. 首先創建一個 Pipeline 對象。

  2. 不管是數據做任何操作,如“ 讀取”或“ 創建”及轉換都要為管道創建 PCollection 一個或多個的數   據集(PCollection<String>)。

  3. 在 Apache Beam 的管道中你可以對數據集 PCollection 做任何操作,例如轉換數據格式,過濾,分組,分析或以其他方式處理數據中的每一個元素。每個轉換都會創建一個新輸出數據集 PCollection,當然你可以在處理完成之前進行做任何的轉換處理。

  4. 把你認為最終處理完成的數據集寫或以其他方式輸出最終的存儲地方。

  5. 最后運行管道。

創建管道對象

每一個 Apache Beam 程序都會從創建管道(Pipeline)對象開始。

在 Apache Beam SDK,每一個管道都是一個獨立的實體,管道的數據集也都封裝着它的數據和對應的數據類型(在 Apache Beam 中有對應的數據轉換 類型包)。最后把數據進行用於各種轉換操作。

在創建的管道的時候需要設置管道選項 PipelineOptions,有兩種創建方式第一種是無參數和一種有參數的。具體兩種有什么不同呢?無參數的可以在程序中指定相應的管道選項參數,如顯示設置執行大數據引擎參數。有參數的就可以在提交 Apache Beam jar 程序的時候進行用 Shell 腳本的方式后期設置管道對應的參數。

具體示例如下:

無參數

// 首先定義管道的選項
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 顯示設置執行大數據引擎
// 創建管道實體對象
Pipeline p = Pipeline.create(options);

有參數

PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

提交設置參數的格式如下:

--<option>=<value>
將數據讀入你的管道

創建 PCollection  的初始值,請讀取外部的數據源及指定的本地數據。例如讀取數據庫,文本文件,流數據等等,現在 Apache Beam java SDKS 支持 33 種數據源,正在接入集成的有 7 種,Python 13 種,正在集成的 1 種。基本覆蓋了 IT 行業的一切數據源。例如讀取文本數據我們可以用 TextIO.Read 的方法進行讀取數據。轉換應用於管道對象 p 中。並且返回對應格式的數據集 PCollection:

PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from("/home/inputData.txt"));

注意: 在 Apache Beam 程序執行中,Beam 程序 2.2.0 以前版本不支持 Windows  如:D\inputData.txt  路徑格式。只支持 linux 路徑格式,及其他如 HDFS 等存儲路徑。

已經支持的數據源統計如下表:

 

 

 

 

 

 

將管道數據轉換為處理的格式

很多時候直接從數據源讀取的數據不會直接流入目標存儲。大部分需要進行數據格式的轉換,數據的清洗,數據的過濾,數據的關聯處理,數據累加操作等。這里需要對源數據進行處理,處理完成的數據處理流入目標存儲外還可以進行當作參數一樣,傳遞並繼續應用到管道中。

以下示例代碼為 把一串數據通過轉換操作賦值給 words , 然后再把 words 再次傳遞到下一個操作應用,再進一步進行操作的處理工作。

PCollection<String> words = ...;
PCollection<String> reversedWords = words.apply(new ReverseWords());
編寫或輸出管道最終輸出數據的地方

經過一些列的清洗、過濾、關聯、轉換處理工作后的數據,最終都會通過 SDKIO 進行寫入管道外的存儲或着數據庫表。然而這種寫入操作大部分都是在管道的末尾端進行操作的。

如下面代碼示例,就是把管道的數據通過 Apache Beam 中的 TextIO.Write 寫入 Linux 的文本文件 test.txt 中。

PCollection<String> filteredWords = ...;
filteredWords.apply("WriteMyFile", TextIO.write().to("/home/test.txt"));
運行你的管道

構建管道后,使用 run 方法最后執行管道。管道以異步方式執行的。寫完這一句代碼后你就可以把自己的程序用 Jenkins 進行編譯並提交給運行管道平台,最終有管道執行平台來運行。

運行代碼示例:

p.run();

處理異步執行的方式,還有同步執行方式,是在 run 方法后面加個看守方法 waitUntilFinish。具體代碼如下:

p.run().waitUntilFinish();
四.怎樣測試你的管道

Apache Beam  管道開發中最后的測試在整個開發中也是非常重要的一個環節。Apache Beam 的代碼程序不必要每次都進行遠程構建執行到 Flink 集群上,因為管道代碼的錯誤及 Bug 的修改在本地能更好的調試,然而每次構建到遠程上面去執行是非常麻煩的事情。Apache Beam 提供 DirectRunner ,一個用於本地測試的執行引擎。

使用 DirectRunner 測試管道的時候,你可以用小規模的數據進行測試。此外你如果開發機器上裝了本地的 flink ,也可以指定本地的 Flink 執行。例如測試一個簡單的轉換函數 DoFn,符合變換,數據源輸入到管道尾端數據輸出等操作。

注意點:DirectRunner 是用於管道或  Apache Beam 程序 本地開發調試測試的 數據執行引擎,不可以用於真正生產環境中運行。否則程序執行性能會大大降低,這里有坑要避開。

測試單個 Pipeline 步驟

我們開發完成管道 Beam 程序后需要本地測試,Beam SDK for Java 提供了一種方便的方法來測試 TestPipeline 的封裝類。  在 Beam SDK  testing 包中。

它的使用操作方法:

    1. 創建一個 TestPipeline。

    2. 創建一些已知的靜態測試數據,也稱為內存數據,真正應用基本是流或批數據。

    3. 使用 Create 方法創建 PCollection 輸入數據。

    4. 使用 Apply 方法進行數據的轉換處理並且返回指定的 PCollection。

    5. 最后使用 PAssert 去驗證輸出的結果是否為預期結果值。

測試實戰示例

Apache Beam 中簡單的管道單元測試實例。

public class CountTest {
// 創建靜態的內存數據
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// 創建一個測試管道.
Pipeline p = TestPipeline.create();
// 創建一個輸入數據集.
PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
// 添加轉換統計單詞個數.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// 驗證結果.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// 運行整個管道.
p.run();
}
端到端的測試管道

端到端的測試,主要針對輸入端和輸出端兩端的測試。要測試整個管道,請執行以下操作:

  • 創建一個 Beam 測試 SDK 中所提供的 TestPipeline 實例。

  • 對於多步驟數據流水線中的每個輸入數據源,創建相對應的靜態(Static)測試數據集。

  • 使用 Create Transform,將所有的這些靜態測試數據集轉換成 PCollection 作為輸入數據集。

  • 按照真實數據流水線邏輯,調用所有的 Transforms 操作。

  • 在數據流水線中所有應用到 Write Transform 的地方,都使用 PAssert 來替換這個 Write Transform,並且驗證輸出的結果是否我們期望的結果相匹配

由於端到端測試跟單個 Pipeline 步驟相似就不在舉示例代碼。其實開發過程中本地調試打斷點,寫日志測試也是更快解決問題的一個辦法。

五. Apache Beam 的管道源碼解析Apache Beam Pipeline 源碼解析

管道源代碼主類是比較簡單的,本文針對  Pipeline.java  進行解析。

1. 定義管道參數及管道創建

在管道創建首先可以定義管道的選項,例如 Beam 作業程序的名稱、唯一標識、運行引擎平台等,當然也可以提交引擎平台用命令指定也可以。然后實例化一個管道對象。

源碼示例如下:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
2. 讀取數據源

讀取要處理的數據,有文本數據,結構化數據和非結構化數據以及流數據。作為數據處理的源數據。

源碼示例如下:

PCollection<String> lines =
p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
3. 進行數據處理操作

在管道里面可以進行窗口操作、函數操作、原子操作以及 SQL 操作。

數據統計的源碼示例:

PCollection<KV<String, Integer>> wordCounts =allLines
.apply(ParDo.of(new ExtractWords()))
.apply(new Count<String>());
4. 輸出結果及運行

源代碼示例:

PCollection<String> formattedWordCounts =
wordCounts.apply(ParDo.of(new FormatCounts()));
formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
p.run();

六.管道實戰案例案例場景描述

隨着人工智能 的不斷發展,AI Cloud 在銀行加快落地,安防 AI 碎片化的應用場景遍地開花。本文結合銀行營業網點的業務,介紹管道案例實戰。

以銀行的員工脫離崗檢測中的行為分析數據預處理為例。我們去銀行辦理業務過程中,首先要取號,然后叫號。叫號提示會對接系統形成一條消息回傳后台,但是有時候正常辦理業務期間有櫃台營業員出去,然后很久才回來。這個時候攝像頭會根據櫃台離崗時間自動 AI 行為分析生成報警處理。

案例業務架構流程

 

 

 

  1. 叫號報警和行為分析報警產生的數據通過營業網點進行上報。

  2. 上傳網關集群,網關集群進行轉換消息格式壓縮消息。

  3. 消息流入消息中心等待消費,消息中心再次起着消峰作用。

  4. 用 Beam 管道的時間窗口特性、流合並處理特性進行消息消費處理

  5. 消息進入大數據實時分析處理平台處理應用消息。

案例示例核心代碼1. 本案例為了節約閱讀時間,采用靜態數據
    // 創建管道工廠
PipelineOptions options = PipelineOptionsFactory.create();
// 顯式指定 PipelineRunner:FlinkRunner 必須指定如果不制定則為本地
options.setRunner(DirectRunner.class); // 生產環境關閉
// options.setRunner(FlinkRunner.class); // 生成環境打開
Pipeline pipeline = Pipeline.create(options);// 設置相關管道
// 為了演示顯示內存數據集
// 叫號數據
final List<KV<String, String>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101 號顧客請到 3 號櫃台"), KV.of("DS-2CD2T26FDWDA3-IS", "102 號顧客請到 1 號櫃台"),
KV.of("DS-2CD6984F-IHS", "103 號顧客請到 4 號櫃台"),
KV.of("DS-2CD7627HWD-LZS", "104 號顧客請到 2 號櫃台"));
//AI 行為分析消息
final List<KV<String, String>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I",
"CMOS 智能半球網絡攝像機, 山東省濟南市解放路支行 3 號櫃,type=2,display_image=no"),
KV.of("DS-2CD2T26FDWDA3-IS", "CMOS 智能筒型網絡攝像機, 山東省濟南市甸柳庄支行 1 號櫃台,type=2,display_image=no"),
KV.of("DS-2CD6984F-IHS", "星光級全景拼接網絡攝像機, 山東省濟南市市中區支行 4 號櫃台,type=2,display_image=no"),
KV.of("DS-2CD7627HWD-LZS", "全結構化攝像機, 山東省濟南市市中區支行 2 號櫃台,type=2,display_image=no"));
PCollection<KV<String, String>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist));
PCollection<KV<String, String>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist));
final TupleTag<String> noticeTag = new TupleTag<>();
final TupleTag<String> messageTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create());
System.out.append("合並分組后的結果:\r");
PCollection<String> contactLines = results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String name = e.getKey();
Iterable<String> emailsIter = e.getValue().getAll(noticeTag);
Iterable<String> phonesIter = e.getValue().getAll(messageTag);
System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r");
}
}));
pipeline.run().waitUntilFinish();
2. 測試運行結果

 

 

源碼地址:pipelineTest2_5.java

七.小結

近幾年隨着 AloT 發展得如火如荼,其落地場景也遍地開花。loT 作為 AI 落地先鋒,已經步入線下各行各業。本文以 Beam 管道的設計切入,重點對 Beam 管道設計工具和源碼進行解析,最后結合銀行金融行業對 AI 碎片化的場景進行數據預處理的案例,幫助大家全面了解 Beam 管道。

作者介紹

張海濤,目前就職於海康威視雲基礎平台,負責海康威視在全國金融行業 AI 大數據落地的基礎架構設計和中間件的開發,專注 AI 大數據方向。Apache Beam 中文社區發起人之一,如果想進一步了解最新 Apache Beam 和 ClickHouse 動態和技術研究成果,請加微信 cyrjkj 入群共同研究和運用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM