Apache Beam和Google Dataflow實用體驗和踩過的坑


原文鏈接:https://zhuanlan.zhihu.com/p/106676174

預計閱讀時間10~30分鍾。難度中等

本文基於我過去半年對 Google Dataflow的潛(盲)心(人)學(摸)習(象),和實(瞎)戰(幾)體(把)驗(用)。在 Storm和 Heron的知識體系以及使用習慣下,完全根據個人經驗所寫的感悟。本文沒有故意抬高或者批判任何 project的意圖,只是想單純分享使用經驗+吐幾個槽。如果有理解和使用上的錯誤或誤區,歡迎一起討論

0. 背景

幾年前我司個別領導拍了一下腦袋:Heron在宣傳上沒有花心思做好,導致現在很多人甚至都沒聽說過Heron這玩意兒,在Apache孵化了那么久也還沒畢業,沒有像Flink那樣搞成流處理霸權,當年開發Heron的黃金一代也都走得差不多了,如今我司和GCP又簽了那么大一個合同,不如。。。我們就把流處理的東西都挪到GCP上去,再琢磨琢磨搞一套混合雲解決方案,這impact誰敢說小,這不就好起來了嘛。。。

就這樣,在大約半年前,我開始了對Google Dataflow的潛(盲)心(人)學(摸)習(象),和實(瞎)戰(幾)體(把)驗(用),不計成本地在GCP上做各種實驗。

Dataflow是Google Cloud的一款拳頭產品。Dataflow上提供了對數據批處理 (Batch Processing) 和流處理 (Streaming Processing) 的能力。簡單的來說,可以把Dataflow想象成跑在雲上的一個Hadoop+Storm共同體。

Apache Beam是一套用來編寫數據處理任務的API,Beam旨在提供一種統一的API來編寫批處理和流處理Job,也就是說可以實現同一套代碼,既可以用來做批處理,也可以用來做流處理。需要注意的是,Beam本身並不處理數據,而是用來描述一個數據處理任務應該如何進行的,真正運行處理的是Runner,現在比較常見的,並且支持Beam的Runner有Google Dataflow,Flink,Spark,Heron(手動狗頭)等。

1. Apache Beam

我們先來說說Beam。

Beam的programming model和傳統的Storm/Heron的那套Spout/Bolt API很不一樣。傳統的Storm/Heron里,所有的業務邏輯都抽象成一個Method,比如Spout里,用戶只要最后提供nextTuple這一個method,來包含所有關於從數據源拿數據,反序列化,解析,以及往下游發送的一系列邏輯

// Storm/Heron Spout nextTuple Example public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }

又比如Bolt里,execute這一個method,包含了所有關於如何處理每一條上游來的數據的業務邏輯。

// Storm/Heron Bolt execute Example @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }

當然,有一定規模的Storm/Heron job里肯定不會把幾百幾千行業務邏輯都寫在一個method里,用戶會自己去模塊化他們的代碼。

與之相比,Beam的API把業務邏輯拆分成了一個個更小的邏輯步驟。抽象的接口也更多。

// Example Beam API pipeline.apply(TextIO.read().from(options.getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.getOutput()));

Beam API里每一個 ".apply()" 都是在對數據集做一步transform。看着這些FlatMapElements.intoFilter.byvia,作為一個被Storm和Heron那套API領進門的孩子,突然面對那么多抽象我有點無所適從,我在寫一個job的時候到底要用哪些接口呢?

1.1 ParDo & DoFn

於是我非(囫)常(圇)耐(吞)心(棗)的把這些抽象的文檔讀了一遍,發現在這些抽象中隱藏着一個大殺器,那就是ParDo!官方文檔上是這么寫的:

ParDois a Beam transform for generic parallel processing.

ParDo可以用來表達幾乎所有上述的接口想要做到的事,仔細一品,這個ParDo是不是就有那么一點像Storm/Heron里的Bolt呢?

ParDo和DoFn是結合在一起用的,具體的處理邏輯寫在DoFn這個interface里,一個簡單的example長這樣:

PCollection<Integer> wordLengths = words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.getElement().length()); } }));

這么一看這個DoFn的processElement就更像Storm/Heron里Bolt的execute了。說實話,自從了解了ParDo之后,我幾乎再也沒用過什么MapElements.intoFilter.by這些花里胡哨的東西,有一種屠龍在手,天下我有的趕腳 (其實如果用那些,代碼可能會更短更好看,所以不要學習我)

我們組有個頭鐵的大兄弟,他是這樣吐槽的:“為什么Beam要用ParDo這么奇葩的名字呢?為什么不直接叫他FlatMap呢?” 我覺得這是個好嘈點。

ParDo=Parallel Do。

關於ParDo還有一個地方可以吐槽一下的是他的branching outputs(對應下圖中藍色框框)。如果你有一個transform,你想讓他的輸出,根據不同的條件對應到下游不同的transform step的話,就需要用到Tag這個東西,給每組output打上一個標簽,這樣再在下游的transform里寫明我要讀的是哪一個標簽的input。

Tag的用法我給你們個例子,你們體會一下:

final TupleTag<String> zhenhuan = new TupleTag<String>(){}; final TupleTag<String> wulanala = new TupleTag<String>(){}; final TupleTag<String> huafei = new TupleTag<String>(){}; PCollectionTuple results = words.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(...); c.output(wulanala, ...); c.output(huafei, ...); } }).withOutputTags(zhenhuan, TupleTagList.of(wulanala).and(huafei)));

在用tag的時候,Beam逼着你從這3個tag里挑選一個當main output tag,在調用c.output時,這個main output tag是不可以explicitly寫明的,但是剩下的tag都需要明確標明,后面的withOutputTags也一樣。

不知道這樣的設計有什么深遠的意義,我只知道我在寫這些tag的時候,腦補了一出宮斗戲。。。我本想雨露均沾的給這3個tag同樣的恩寵。。。可惜Beam不允許

1.2 Coder

我剛開始寫Beam code的時候,在Coder這里跌了很多跟頭。。。Coder是Beam里對不同數據類型進行編碼的模塊。對於每一個apply step(也就是每一步數據transform),Beam都會對這一步transform輸出的data進行編碼(encoding),這里就需要用到Coder。如果輸出的type是primitive,或者是Beam原生的data type,那系統會自己找到對應的coder,不需要額外在code里寫明。如果輸出的type是自定義的,比如輸出一個thrift object,那系統就會默認用一個general的SerializableCoder。這個coder和thrift,avro這類protocol object很不搭,不僅性能不好,而且還會間歇性地報serialization異常。這種情況下,你就需要自己寫一個專門的coder,並且在job code里explicitly寫明,這一步transform的輸出請用這個coder來編碼。

Coder在這里的意義在於,一個Beam job的不同step,可能會跑在不同的worker machine上,數據從一個worker跳到另一個worker的過程中,就需要進行encode/decode。

這里我想強調一個特殊的type,那就是Beam的KV。KV顧名思義就是Key-Value Pair,用來表示一對數據。如果一個transform的輸出是一個KV,並且Key或者Value里有自定義的type, 比如KV<String, MyThriftObject>,那這個時候也需要explicitly寫明coder

PCollection<KV<String, MyThriftObject>> kvs = pipeline.apply( ParDo.of(new DoFn<MyThriftObject, KV<String, MyThriftObject>>() { ... })).setCoder(KVCoder.of(StringUtf8Coder.of(), MyThriftObjectCoder.of()));

1.3 Windowing, Watermark, Trigger...

終於要說到Beam最閃耀的燈球了。Window,watermark,trigger,這三個概念是Beam最引以為豪的概念,也是這本書里花費幾乎全部篇章去講的幾個概念。說實話,我剛接觸到這三個概念的時候心里是抵觸的,因為Heron里沒有這樣的東西,我踏入了知識盲區。

於是我非(囫)常(圇)耐(吞)心(棗)的把這本書里關於這三個概念的章節讀了一遍,又讀了官方文檔上Mobile Gaming這個相對復雜的tutorial(這個tutorial很值得一讀),對這三個概念大致有了了解。我這里就不贅述了,因為贅述不好,所以我還是舉一個例子好了。

1.3.1 看個栗子

實踐中,我找到了一個現有的use case可以用到window API。這個use case簡單來說是醬嬸的:

  1. 我有一個job,這個job不停地從Message Queue里讀數據,這些數據的key是一個個string
  2. 對每一個key,如果是2小時內第一次出現,那就給這個key創建一個2小時的window
  3. Count這個key在這2小時里出現的次數,一旦次數超過了一個閾值,就輸出一個(key,true)的tuple,如果還沒超過閾值,則輸出(key,false)tuple
  4. 2小時后,這個key的window自動關閉

舉個例子來說,如果這個閾值是2,那對應的input和output是這樣的

這怎么看都是一個標准的Session Windowing use case嘛!我翻開書,開始抄代碼,但是抄着抄着,笑容逐漸消失。

首先Beam的window一般是給你用來做aggregation的,一般來說,1個window結束,輸出1條結果。上述的這個use case里呢,window即用來做aggregation(count key出現過的次數),也需要不斷對每個key到底是true還是false做判斷,並且輸出。有幾條input,就得有幾條output,而且從收到一條input到輸出一條output之間要保證實時性,不能等2小時window結束的時候,shua的一下把所有output一下子都倒出來。

要解決這個問題就要用到Trigger,trigger是用來決定什么時候一個window要具現化(materialize)一個output,用人話說,trigger就是用來控制window啥時候可以output。Beam提供了很多Trigger選擇,這里我就用了withEarlyFirings(AfterPane.elementCountAtLeast(1)))。這個trigger的意思是說,這個window每遇到一個新的element,就output一次,這樣這個window在沒關閉前就可以不斷輸出,同時還能繼續做aggregation。

另一個問題是,Beam自帶的SessionWindow時間長度是不固定的,而我需要的是一個定長的session window。要解決這個問題,就只能自己重寫一個window了。。。這是我寫的粗糙的版本,為什么用scala呢?這個不要問我,我也不喜歡scala,用scala去搞Beam這種Java的library聽起來簡單,其實很造作。。。

import java.util import org.apache.beam.sdk.coders.Coder import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, WindowFn, WindowMappingFn} import org.joda.time.Duration import scala.collection.JavaConverters._ class MergeCandidate[T](var union: IntervalWindow = null) { var parts: Seq[IntervalWindow] = if (union == null) { Seq.empty } else { Seq(union) } def intersects(window: IntervalWindow): Boolean = union == null || union.intersects(window) def add(window: IntervalWindow, maxSize: Duration): Unit = { union = if (union == null) { window } else { // Make sure the union window is always  // the earliest start time of all its parts + the max duration  val earlierStart = if (union.start().isBefore(window.start())) { union.start() } else { window.start() } new IntervalWindow(earlierStart, maxSize) } parts = parts :+ window } def apply(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { if (parts.size > 1) { c.merge(parts.asJava, union) } } } /**  * A Fixed Size session in terms of duration  * @param maxSize the max duration a session can last  * @tparam T  */ class FixedSizeSession[T](maxSize: Duration) extends WindowFn[T, IntervalWindow] { def windowSize(window: IntervalWindow): Duration = if (window == null) { new Duration(0) } else { new Duration(window.start(), window.end()) } override def assignWindows(c: WindowFn[T, IntervalWindow]#AssignContext) : util.Collection[IntervalWindow] = Seq(new IntervalWindow(c.timestamp(), maxSize)).asJava override def mergeWindows(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { val sortedWindows = c.windows().asScala.toSeq.sorted var merges = Seq.empty[MergeCandidate[T]] var current = new MergeCandidate[T]() sortedWindows.foreach { window => if (current.intersects(window)) { current.add(window, maxSize) } else { merges = merges :+ current current = new MergeCandidate[T](window) } } merges = merges :+ current merges.foreach(_.apply(c)) } override def isCompatible(other: WindowFn[_, _]): Boolean = true override def windowCoder(): Coder[IntervalWindow] = IntervalWindow.getCoder override def getDefaultWindowMappingFn: WindowMappingFn[IntervalWindow] = throw new UnsupportedOperationException("Sessions is not allowed in side inputs") }

寫完這個新的window implementation之后,又發現一個問題,那就是我還得寫一個自定義的Combiner,用來count一個window里,該key出現的次數,這么簡單的一個aggregation function,愣是沒找到Beam原生的實現,於是乎,我又擼了一個Combiner

import java.lang import org.apache.beam.sdk.transforms.Combine.CombineFn import scala.collection.JavaConverters._ case class CountAndValue[T](count: Long, value: T) case class CountAndValueAccumulator[T](var count: Long = 0L, var latest: Option[T] = None, var ts: Long = 0L) { def update(input: T): Unit = { ts = System.currentTimeMillis() count += 1 latest = Some(input) } } class CountAndValueCombiner[T] extends CombineFn[T, CountAndValueAccumulator[T], CountAndValue[T]] { override def createAccumulator(): CountAndValueAccumulator[T] = { new CountAndValueAccumulator[T] } override def addInput(mutableAccumulator: CountAndValueAccumulator[T], input: T): CountAndValueAccumulator[T] = { mutableAccumulator.update(input) mutableAccumulator } override def mergeAccumulators(accumulators: lang.Iterable[CountAndValueAccumulator[T]]) : CountAndValueAccumulator[T] = { var totalCount = 0L var latestTs = 0L var latestValue: Option[T] = None accumulators.asScala.foreach { accumulator => totalCount += accumulator.count if (accumulator.ts > latestTs) { latestTs = accumulator.ts latestValue = accumulator.latest } } new CountAndValueAccumulator[T](totalCount, latestValue, latestTs) } override def extractOutput(accumulator: CountAndValueAccumulator[T]): CountAndValue[T] = { CountAndValue(accumulator.count, accumulator.latest.get) } }

到這里,我終於能用上Beam這些高級的概念來寫我的job啦!

inputs.apply("sessionWindow", Window.into[KV[String, String]]( new FixedSizeSession[KV[String, String]](Duration.standardHours(2)) ).accumulatingFiredPanes() .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.ZERO)) .apply(Combine.perKey(new CountAndValueCombiner[String]())) .apply("Output", ParDo.of({...})

然而花了好大力氣寫的這個實現,到了dataflow上表現並不好,之后的章節會具體細說。

1.4 Stateful API & Timer

上一個實現在dataflow上表現不好,性能並不高。經過一番對Beam官方文檔的挖掘,我又找到了一個新的大殺器。。。那就是Stateful ParDo。ParDo是真的牛逼,絕對的C位,所有的大殺器都是圍繞着ParDo展開的。

Stateful ParDo就是通過幾個annotation和額外的method,把一個普通的ParDo變成一個stateful component,這樣一個ParDo處理單元就能擁有可持續化的狀態,以上面的例子來說,這個state就可以是每個key對應的出現次數。這些state會儲存在對應runner里約定的存儲里,比如在dataflow上,state是存在每個worker的硬盤上。

一旦一個ParDo變成了Stateful ParDo,數據從它的上游到這個Stateful ParDo之間也會自動多一步data shuffling的步驟,以確保相同的key總是落到同一個worker上。

Stateful ParDo還經常和Timer一起使用,timer可以用來控制什么時候對這個state做一些lifecycle management,比如清空state。

你細細品一下,有了這個stateful ParDo和Timer,似乎也可以完全不用window那套東西了。。。

ParDo是真的香!

2. Google Dataflow

下面來說說Dataflow,dataflow相對來說坑就多了。。。

2.1 Window Performance

上文說到,我辛苦寫了一個定長的Session Windowing的job,拿到dataflow上去跑。不久就發現了一個現象:Job剛開始跑的前2小時,性能指標和業務邏輯指標一切正常。到了最初2小時結束的時間點,各項指標都開始明顯變差,隨着時間越來越久,各項指標變差的程度也越來越大。各種慘不忍睹。2小時又恰好是一個session window的固定時長。這肯定不是巧合。

Dataflow對用戶來說幾乎就是一個黑盒,暴露給你的關於出錯的信息少的可憐。這時候就只能去求助Google了。好在我司有一支龐大的Google支持團隊常年駐扎,在幾個Google老司機的協助調查下發現,在最開始的2小時里,每一個unique的key都會對應創建一個新的window,所有的window都處於未關閉的狀態(in-flight),這時候一切安好。

到了2小時這個時間點的時候,這些window都需要關閉,同時發生了太多window close的trigger,一下子給worker加了很多壓力。這些瞬時的壓力造成了巨大的資源占用,導致之后新一輪2小時的window們處理數據的吞吐量也變低,等新一輪的window也准備關閉的時候,又會產生新的瞬時壓力。尷尬的是,這個時候,很可能第一輪2小時window關閉所帶來的壓力還沒有被完全消化完,這就形成了一個惡性循環,性能也就呈雪崩式崩塌。

有趣的是,用Stateful ParDo寫的實現並沒有遇到這樣的雪崩問題。你細細品,我實際上是用了Stateful ParDo實現了一個Session Window,然而卻比真正用Beam Window API表現得要更好。

Google團隊給我的答復是,你這個use case吧,就不適合用window那套騷操作,就stick with stateful ParDo吧。

2.2 Streaming Engine Mode Vs. Streaming Appliance Mode

當然,stateful ParDo的實現也不是完全沒有問題的。我一度發現這個job無論怎么加CPU和內存資源,都沒有使它的吞吐量達到我期待的水准,然鵝如果增加硬盤資源,反而能顯著提高吞吐量。最后在這個文檔里找到了解釋。Dataflow上硬盤的吞吐量(IOPS)有一個上限,這個上限和你分配的硬盤大小正相關,硬盤分配的越多,容許的吞吐量上限也就越高。而硬盤吞吐量在dataflow里又和data shuffling息息相關。數據源輸出數據的吞吐量大,對於這么一個stateful processing job,data shuffling的吞吐量也大,對於硬盤的吞吐量需求就高,對於硬盤大小的要求也就越高。所以即使這個job本身存的state並沒有很大,不占用多少硬盤資源,也還是需要一個無比巨大的硬盤池來驅動它的正常運轉。

說到data shuffling,就不得不說Google團隊一直在鼓吹我用的一個experiment feature,名喚streaming engine。據Google團隊的描述,在dataflow上,一個streaming job默認是以一個叫Streaming appliance的模式下運行,這個模式通俗點講就是你負責花錢買了一大堆Google Compute Engine以及計算資源,Google負責把dataflow部署到你的這個GCE集群上,然后你就自己跑你的job了,你的job里的一切開銷都由你分配的資源承擔,這里最重要的一筆開銷就是data shuffling的開銷,就如我上面所說,在streaming appliance mode下,data shuffling消耗的是你自己的硬盤資源。

那么streaming engine mode是怎么一回事呢?就是你把data shuffling這部分任務交給Google管理的一個服務上去完成,不消耗你自己的GCE資源。如果選擇streaming engine mode的話,你所需要分配的資源都可以大大的降低,尤其是硬盤資源。

這波安利聽起來不錯,我便將我的job換成了streaming engine mode。換完了之后,各項指標通通像喝醉了一般,上躥下跳的,極其不穩定。整個今年一季度我都在和Google團隊尋找streaming engine mode性能不穩的原因,至今未果。。。

2.3 Autoscaling

Dataflow有一個根據數據吞吐量,自動彈性伸縮的功能。在Dataflow的世界里,計算資源被單元化成一個個container,也就是傳說中的Kubernetes container。在啟動一個Dataflow job的時候,可以告訴他最少需要幾個container,最多可以有幾個container。就我所做的實驗觀察下來發現有這么2個問題:

  1. Job從啟動成功,到開始做彈性伸縮,調整container數量,這之間平均需要15-30分鍾左右冷啟動的時間。這個時間我覺得有點略久
  2. 我從來沒見過這個伸縮的“彈性”表現在哪里,基本上我所觀察到的,永遠是直接從配置好的最少container數量,直接跳轉到配置好的最多container數量。中間沒有一個平滑上升過程。

自動彈性伸縮這個點我就不深度吐槽了,因為我不是很care,本來就大概清楚一個job需要多少資源和container,也就不存在需要彈性伸縮來幫助我找到最佳的container數量。

2.4 Metrics, Stackdriver

Metrics這個點是個大坑。Beam里有一套定義的還不錯的Metrics API。其中就包含最常見的Counter,Distribution,和Gauge。

  • Counter,顧名思義,是用來計數的一個metric
  • Distribution用來累加和統計一個分布,可以得到類似mean, median, 90/95/99 percentile 這類metrics
  • Gauge也是用來計數的一個metric。

Counter和Gauge的區別在於,counter的metric是一個時間序列,對於每個時間單位(每分鍾,每小時)這個計數分別是多少。Gauge是一個all time aggregate,是一個絕對標量,不存在時間這個維度。

然鵝,Counter和Gauge這兩個metrics到Dataflow里,就變成了一個意思。。。無論你Beam job里用的是哪個Metric,到了Stackdriver(GCP上的Metrics reporter)里,這兩個都會變成Gauge。也就是說,你想計數每小時處理了多少數據,生成一個時間序列圖表啥的,是做不了的。。。這就很不能忍了。Google團隊說這是一個bug,

Stackdriver這個metrics產品,做的也是差強人意。

我們最后只能另辟蹊徑,整合了一部分我司自己的metrics API到Beam里,並且把metrics數據千里迢迢導回到我司自己的metrics reporter里。所以這個Bug有沒有被修復,我也沒再去follow。

2.5 Job Dashboard

Dataflow的Job Dashboard做的乍一看很Google,仔細品就能發現很粗糙,而且充滿歧義。

比如上圖這個dashboard,紅色箭頭所指的那些數字,明顯能看出是某種吞吐量,我第一反應是這個吞吐量代表這個transform step的輸出吞吐量,可經過多次實驗,發現這些數字代表的是當前transform的輸入吞吐量。。。因此中間兩個step的這兩個數字是一毛一樣的。

那么藍色箭頭所指的又是什么鬼呢?從官方文檔上的只言片語描述,這些代表當前transform的lag,lag是怎么計算的呢?這就不知道了。。。還有右邊綠色框框框起來的這兩張表,又是什么鬼呢?也不明所以。更誇張的是,這幾個代表lag的數字和圖表,其實是不准的。我司在Dataflow跌爬滾打多年的前輩和我說這幾個metrics他們從來不看。。。要測lag,我們用自己在業務邏輯里定義好的lag metrics就行了。

好在Google團隊允許我使用他們新一代的Job Dashboard。

 

新的dashboard至少新增了輸出吞吐量,CPU使用率,和錯誤量這幾個還算有用的圖表。比舊版的dashboard稍微有用了那么一丟丟。

2.7 成本

最后來聊聊Dataflow的成本,具體數字和對比不方便公布。有一說一,Dataflow畢竟是雲托管的一項服務,可以幫一個公司節省許多人力和機房上難以計量的金錢成本和時間成本。所以你硬要比較Dataflow的成本,和自己在機房部署並管理一個Storm或者Heron集群的成本,很難做到很詳細,精確,和公平。

Streaming appliance mode的計費方法算是良心的,就如我前文所說,你只需要支付你所分配的GCE資源即可,dataflow這個服務是不收取額外費用的,相當於白給。

Streaming engine mode相比appliance mode可以大幅減少你所花在GCE上的成本,但是相對應的,它會收取一份data shuffling service的費用,這筆費用是按照數據吞吐量來計算的。

3 總的來說

總的來說,Beam是一個挺有意思的project。文檔還算完善。能看得出他想一統數據處理API的野心,也能看出設計者對流數據處理理解的境界,所以才能設計出我贅述不了的Window,trigger,watermark這些概念。同時Beam里又提供了ParDo這種可以用來解決一切疑難雜症的通用模塊。Beam的坑不多,即使有些特性在dataflow里不支持,在flink里不支持,也都不是Beam的鍋。但是有一說一,Beam的學習曲線還是算陡峭的。如果你像我一樣被Storm/Heron一路帶進門的話,不妨花點時間學習一下Beam,從另一個角度理解流數據處理的本質。

Dataflow是一個不斷在聽取用戶反饋,bug report,進行改進的服務。從長遠的角度我挺看好Dataflow的成長,畢竟背后有google這么一個爹的技術支持。在其他幾家大的雲服務商上也很難找到一個像Dataflow這么一個個性鮮明,自成體系的數據處理系統(有你也別告訴我,我用的公司經費只夠用來玩Dataflow,不夠玩其他的。。。)但是就目前的情況來看,Dataflow本身的核心(包括streaming engine mode,對Beam API部分特性的支持),還有Dataflow的周邊服務(包括metrics,dashboard)需要提高和改進的地方還有很多。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM