原文鏈接:https://zhuanlan.zhihu.com/p/106676174
預計閱讀時間10~30分鍾。難度中等
本文基於我過去半年對 Google Dataflow的潛(盲)心(人)學(摸)習(象),和實(瞎)戰(幾)體(把)驗(用)。在 Storm和 Heron的知識體系以及使用習慣下,完全根據個人經驗所寫的感悟。本文沒有故意抬高或者批判任何 project的意圖,只是想單純分享使用經驗+吐幾個槽。如果有理解和使用上的錯誤或誤區,歡迎一起討論
0. 背景
幾年前我司個別領導拍了一下腦袋:Heron在宣傳上沒有花心思做好,導致現在很多人甚至都沒聽說過Heron這玩意兒,在Apache孵化了那么久也還沒畢業,沒有像Flink那樣搞成流處理霸權,當年開發Heron的黃金一代也都走得差不多了,如今我司和GCP又簽了那么大一個合同,不如。。。我們就把流處理的東西都挪到GCP上去,再琢磨琢磨搞一套混合雲解決方案,這impact誰敢說小,這不就好起來了嘛。。。
就這樣,在大約半年前,我開始了對Google Dataflow的潛(盲)心(人)學(摸)習(象),和實(瞎)戰(幾)體(把)驗(用),不計成本地在GCP上做各種實驗。
Dataflow是Google Cloud的一款拳頭產品。Dataflow上提供了對數據批處理 (Batch Processing) 和流處理 (Streaming Processing) 的能力。簡單的來說,可以把Dataflow想象成跑在雲上的一個Hadoop+Storm共同體。
Apache Beam是一套用來編寫數據處理任務的API,Beam旨在提供一種統一的API來編寫批處理和流處理Job,也就是說可以實現同一套代碼,既可以用來做批處理,也可以用來做流處理。需要注意的是,Beam本身並不處理數據,而是用來描述一個數據處理任務應該如何進行的,真正運行處理的是Runner,現在比較常見的,並且支持Beam的Runner有Google Dataflow,Flink,Spark,Heron(手動狗頭)等。
1. Apache Beam
我們先來說說Beam。
Beam的programming model和傳統的Storm/Heron的那套Spout/Bolt API很不一樣。傳統的Storm/Heron里,所有的業務邏輯都抽象成一個Method,比如Spout里,用戶只要最后提供nextTuple這一個method,來包含所有關於從數據源拿數據,反序列化,解析,以及往下游發送的一系列邏輯
// Storm/Heron Spout nextTuple Example public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
又比如Bolt里,execute這一個method,包含了所有關於如何處理每一條上游來的數據的業務邏輯。
// Storm/Heron Bolt execute Example @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }
當然,有一定規模的Storm/Heron job里肯定不會把幾百幾千行業務邏輯都寫在一個method里,用戶會自己去模塊化他們的代碼。
與之相比,Beam的API把業務邏輯拆分成了一個個更小的邏輯步驟。抽象的接口也更多。
// Example Beam API pipeline.apply(TextIO.read().from(options.getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.getOutput()));
Beam API里每一個 ".apply()" 都是在對數據集做一步transform。看着這些FlatMapElements.into, Filter.by, via,作為一個被Storm和Heron那套API領進門的孩子,突然面對那么多抽象我有點無所適從,我在寫一個job的時候到底要用哪些接口呢?
1.1 ParDo & DoFn
於是我非(囫)常(圇)耐(吞)心(棗)的把這些抽象的文檔讀了一遍,發現在這些抽象中隱藏着一個大殺器,那就是ParDo!官方文檔上是這么寫的:
ParDo
is a Beam transform for generic parallel processing.
ParDo可以用來表達幾乎所有上述的接口想要做到的事,仔細一品,這個ParDo是不是就有那么一點像Storm/Heron里的Bolt呢?
ParDo和DoFn是結合在一起用的,具體的處理邏輯寫在DoFn這個interface里,一個簡單的example長這樣:
PCollection<Integer> wordLengths = words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.getElement().length()); } }));
這么一看這個DoFn的processElement就更像Storm/Heron里Bolt的execute了。說實話,自從了解了ParDo之后,我幾乎再也沒用過什么MapElements.into,Filter.by這些花里胡哨的東西,有一種屠龍在手,天下我有的趕腳 (其實如果用那些,代碼可能會更短更好看,所以不要學習我)
我們組有個頭鐵的大兄弟,他是這樣吐槽的:“為什么Beam要用ParDo這么奇葩的名字呢?為什么不直接叫他FlatMap呢?” 我覺得這是個好嘈點。
ParDo=Parallel Do。
關於ParDo還有一個地方可以吐槽一下的是他的branching outputs(對應下圖中藍色框框)。如果你有一個transform,你想讓他的輸出,根據不同的條件對應到下游不同的transform step的話,就需要用到Tag這個東西,給每組output打上一個標簽,這樣再在下游的transform里寫明我要讀的是哪一個標簽的input。
Tag的用法我給你們個例子,你們體會一下:
final TupleTag<String> zhenhuan = new TupleTag<String>(){}; final TupleTag<String> wulanala = new TupleTag<String>(){}; final TupleTag<String> huafei = new TupleTag<String>(){}; PCollectionTuple results = words.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(...); c.output(wulanala, ...); c.output(huafei, ...); } }).withOutputTags(zhenhuan, TupleTagList.of(wulanala).and(huafei)));
在用tag的時候,Beam逼着你從這3個tag里挑選一個當main output tag,在調用c.output時,這個main output tag是不可以explicitly寫明的,但是剩下的tag都需要明確標明,后面的withOutputTags也一樣。
不知道這樣的設計有什么深遠的意義,我只知道我在寫這些tag的時候,腦補了一出宮斗戲。。。我本想雨露均沾的給這3個tag同樣的恩寵。。。可惜Beam不允許
1.2 Coder
我剛開始寫Beam code的時候,在Coder這里跌了很多跟頭。。。Coder是Beam里對不同數據類型進行編碼的模塊。對於每一個apply step(也就是每一步數據transform),Beam都會對這一步transform輸出的data進行編碼(encoding),這里就需要用到Coder。如果輸出的type是primitive,或者是Beam原生的data type,那系統會自己找到對應的coder,不需要額外在code里寫明。如果輸出的type是自定義的,比如輸出一個thrift object,那系統就會默認用一個general的SerializableCoder。這個coder和thrift,avro這類protocol object很不搭,不僅性能不好,而且還會間歇性地報serialization異常。這種情況下,你就需要自己寫一個專門的coder,並且在job code里explicitly寫明,這一步transform的輸出請用這個coder來編碼。
Coder在這里的意義在於,一個Beam job的不同step,可能會跑在不同的worker machine上,數據從一個worker跳到另一個worker的過程中,就需要進行encode/decode。
這里我想強調一個特殊的type,那就是Beam的KV。KV顧名思義就是Key-Value Pair,用來表示一對數據。如果一個transform的輸出是一個KV,並且Key或者Value里有自定義的type, 比如KV<String, MyThriftObject>,那這個時候也需要explicitly寫明coder
PCollection<KV<String, MyThriftObject>> kvs = pipeline.apply( ParDo.of(new DoFn<MyThriftObject, KV<String, MyThriftObject>>() { ... })).setCoder(KVCoder.of(StringUtf8Coder.of(), MyThriftObjectCoder.of()));
1.3 Windowing, Watermark, Trigger...
終於要說到Beam最閃耀的燈球了。Window,watermark,trigger,這三個概念是Beam最引以為豪的概念,也是這本書里花費幾乎全部篇章去講的幾個概念。說實話,我剛接觸到這三個概念的時候心里是抵觸的,因為Heron里沒有這樣的東西,我踏入了知識盲區。
於是我非(囫)常(圇)耐(吞)心(棗)的把這本書里關於這三個概念的章節讀了一遍,又讀了官方文檔上Mobile Gaming這個相對復雜的tutorial(這個tutorial很值得一讀),對這三個概念大致有了了解。我這里就不贅述了,因為贅述不好,所以我還是舉一個例子好了。
1.3.1 看個栗子
實踐中,我找到了一個現有的use case可以用到window API。這個use case簡單來說是醬嬸的:
- 我有一個job,這個job不停地從Message Queue里讀數據,這些數據的key是一個個string
- 對每一個key,如果是2小時內第一次出現,那就給這個key創建一個2小時的window
- Count這個key在這2小時里出現的次數,一旦次數超過了一個閾值,就輸出一個(key,true)的tuple,如果還沒超過閾值,則輸出(key,false)tuple
- 2小時后,這個key的window自動關閉
舉個例子來說,如果這個閾值是2,那對應的input和output是這樣的
這怎么看都是一個標准的Session Windowing use case嘛!我翻開書,開始抄代碼,但是抄着抄着,笑容逐漸消失。
首先Beam的window一般是給你用來做aggregation的,一般來說,1個window結束,輸出1條結果。上述的這個use case里呢,window即用來做aggregation(count key出現過的次數),也需要不斷對每個key到底是true還是false做判斷,並且輸出。有幾條input,就得有幾條output,而且從收到一條input到輸出一條output之間要保證實時性,不能等2小時window結束的時候,shua的一下把所有output一下子都倒出來。
要解決這個問題就要用到Trigger,trigger是用來決定什么時候一個window要具現化(materialize)一個output,用人話說,trigger就是用來控制window啥時候可以output。Beam提供了很多Trigger選擇,這里我就用了withEarlyFirings(AfterPane.elementCountAtLeast(1)))。這個trigger的意思是說,這個window每遇到一個新的element,就output一次,這樣這個window在沒關閉前就可以不斷輸出,同時還能繼續做aggregation。
另一個問題是,Beam自帶的SessionWindow時間長度是不固定的,而我需要的是一個定長的session window。要解決這個問題,就只能自己重寫一個window了。。。這是我寫的粗糙的版本,為什么用scala呢?這個不要問我,我也不喜歡scala,用scala去搞Beam這種Java的library聽起來簡單,其實很造作。。。
import java.util import org.apache.beam.sdk.coders.Coder import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, WindowFn, WindowMappingFn} import org.joda.time.Duration import scala.collection.JavaConverters._ class MergeCandidate[T](var union: IntervalWindow = null) { var parts: Seq[IntervalWindow] = if (union == null) { Seq.empty } else { Seq(union) } def intersects(window: IntervalWindow): Boolean = union == null || union.intersects(window) def add(window: IntervalWindow, maxSize: Duration): Unit = { union = if (union == null) { window } else { // Make sure the union window is always // the earliest start time of all its parts + the max duration val earlierStart = if (union.start().isBefore(window.start())) { union.start() } else { window.start() } new IntervalWindow(earlierStart, maxSize) } parts = parts :+ window } def apply(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { if (parts.size > 1) { c.merge(parts.asJava, union) } } } /** * A Fixed Size session in terms of duration * @param maxSize the max duration a session can last * @tparam T */ class FixedSizeSession[T](maxSize: Duration) extends WindowFn[T, IntervalWindow] { def windowSize(window: IntervalWindow): Duration = if (window == null) { new Duration(0) } else { new Duration(window.start(), window.end