原文链接:https://zhuanlan.zhihu.com/p/106676174
预计阅读时间10~30分钟。难度中等
本文基于我过去半年对 Google Dataflow的潜(盲)心(人)学(摸)习(象),和实(瞎)战(几)体(把)验(用)。在 Storm和 Heron的知识体系以及使用习惯下,完全根据个人经验所写的感悟。本文没有故意抬高或者批判任何 project的意图,只是想单纯分享使用经验+吐几个槽。如果有理解和使用上的错误或误区,欢迎一起讨论
0. 背景
几年前我司个别领导拍了一下脑袋:Heron在宣传上没有花心思做好,导致现在很多人甚至都没听说过Heron这玩意儿,在Apache孵化了那么久也还没毕业,没有像Flink那样搞成流处理霸权,当年开发Heron的黄金一代也都走得差不多了,如今我司和GCP又签了那么大一个合同,不如。。。我们就把流处理的东西都挪到GCP上去,再琢磨琢磨搞一套混合云解决方案,这impact谁敢说小,这不就好起来了嘛。。。
就这样,在大约半年前,我开始了对Google Dataflow的潜(盲)心(人)学(摸)习(象),和实(瞎)战(几)体(把)验(用),不计成本地在GCP上做各种实验。
Dataflow是Google Cloud的一款拳头产品。Dataflow上提供了对数据批处理 (Batch Processing) 和流处理 (Streaming Processing) 的能力。简单的来说,可以把Dataflow想象成跑在云上的一个Hadoop+Storm共同体。
Apache Beam是一套用来编写数据处理任务的API,Beam旨在提供一种统一的API来编写批处理和流处理Job,也就是说可以实现同一套代码,既可以用来做批处理,也可以用来做流处理。需要注意的是,Beam本身并不处理数据,而是用来描述一个数据处理任务应该如何进行的,真正运行处理的是Runner,现在比较常见的,并且支持Beam的Runner有Google Dataflow,Flink,Spark,Heron(手动狗头)等。
1. Apache Beam
我们先来说说Beam。
Beam的programming model和传统的Storm/Heron的那套Spout/Bolt API很不一样。传统的Storm/Heron里,所有的业务逻辑都抽象成一个Method,比如Spout里,用户只要最后提供nextTuple这一个method,来包含所有关于从数据源拿数据,反序列化,解析,以及往下游发送的一系列逻辑
// Storm/Heron Spout nextTuple Example public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
又比如Bolt里,execute这一个method,包含了所有关于如何处理每一条上游来的数据的业务逻辑。
// Storm/Heron Bolt execute Example @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }
当然,有一定规模的Storm/Heron job里肯定不会把几百几千行业务逻辑都写在一个method里,用户会自己去模块化他们的代码。
与之相比,Beam的API把业务逻辑拆分成了一个个更小的逻辑步骤。抽象的接口也更多。
// Example Beam API pipeline.apply(TextIO.read().from(options.getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.getOutput()));
Beam API里每一个 ".apply()" 都是在对数据集做一步transform。看着这些FlatMapElements.into, Filter.by, via,作为一个被Storm和Heron那套API领进门的孩子,突然面对那么多抽象我有点无所适从,我在写一个job的时候到底要用哪些接口呢?
1.1 ParDo & DoFn
于是我非(囫)常(囵)耐(吞)心(枣)的把这些抽象的文档读了一遍,发现在这些抽象中隐藏着一个大杀器,那就是ParDo!官方文档上是这么写的:
ParDo
is a Beam transform for generic parallel processing.
ParDo可以用来表达几乎所有上述的接口想要做到的事,仔细一品,这个ParDo是不是就有那么一点像Storm/Heron里的Bolt呢?
ParDo和DoFn是结合在一起用的,具体的处理逻辑写在DoFn这个interface里,一个简单的example长这样:
PCollection<Integer> wordLengths = words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.getElement().length()); } }));
这么一看这个DoFn的processElement就更像Storm/Heron里Bolt的execute了。说实话,自从了解了ParDo之后,我几乎再也没用过什么MapElements.into,Filter.by这些花里胡哨的东西,有一种屠龙在手,天下我有的赶脚 (其实如果用那些,代码可能会更短更好看,所以不要学习我)
我们组有个头铁的大兄弟,他是这样吐槽的:“为什么Beam要用ParDo这么奇葩的名字呢?为什么不直接叫他FlatMap呢?” 我觉得这是个好嘈点。
ParDo=Parallel Do。
关于ParDo还有一个地方可以吐槽一下的是他的branching outputs(对应下图中蓝色框框)。如果你有一个transform,你想让他的输出,根据不同的条件对应到下游不同的transform step的话,就需要用到Tag这个东西,给每组output打上一个标签,这样再在下游的transform里写明我要读的是哪一个标签的input。
Tag的用法我给你们个例子,你们体会一下:
final TupleTag<String> zhenhuan = new TupleTag<String>(){}; final TupleTag<String> wulanala = new TupleTag<String>(){}; final TupleTag<String> huafei = new TupleTag<String>(){}; PCollectionTuple results = words.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(...); c.output(wulanala, ...); c.output(huafei, ...); } }).withOutputTags(zhenhuan, TupleTagList.of(wulanala).and(huafei)));
在用tag的时候,Beam逼着你从这3个tag里挑选一个当main output tag,在调用c.output时,这个main output tag是不可以explicitly写明的,但是剩下的tag都需要明确标明,后面的withOutputTags也一样。
不知道这样的设计有什么深远的意义,我只知道我在写这些tag的时候,脑补了一出宫斗戏。。。我本想雨露均沾的给这3个tag同样的恩宠。。。可惜Beam不允许
1.2 Coder
我刚开始写Beam code的时候,在Coder这里跌了很多跟头。。。Coder是Beam里对不同数据类型进行编码的模块。对于每一个apply step(也就是每一步数据transform),Beam都会对这一步transform输出的data进行编码(encoding),这里就需要用到Coder。如果输出的type是primitive,或者是Beam原生的data type,那系统会自己找到对应的coder,不需要额外在code里写明。如果输出的type是自定义的,比如输出一个thrift object,那系统就会默认用一个general的SerializableCoder。这个coder和thrift,avro这类protocol object很不搭,不仅性能不好,而且还会间歇性地报serialization异常。这种情况下,你就需要自己写一个专门的coder,并且在job code里explicitly写明,这一步transform的输出请用这个coder来编码。
Coder在这里的意义在于,一个Beam job的不同step,可能会跑在不同的worker machine上,数据从一个worker跳到另一个worker的过程中,就需要进行encode/decode。
这里我想强调一个特殊的type,那就是Beam的KV。KV顾名思义就是Key-Value Pair,用来表示一对数据。如果一个transform的输出是一个KV,并且Key或者Value里有自定义的type, 比如KV<String, MyThriftObject>,那这个时候也需要explicitly写明coder
PCollection<KV<String, MyThriftObject>> kvs = pipeline.apply( ParDo.of(new DoFn<MyThriftObject, KV<String, MyThriftObject>>() { ... })).setCoder(KVCoder.of(StringUtf8Coder.of(), MyThriftObjectCoder.of()));
1.3 Windowing, Watermark, Trigger...
终于要说到Beam最闪耀的灯球了。Window,watermark,trigger,这三个概念是Beam最引以为豪的概念,也是这本书里花费几乎全部篇章去讲的几个概念。说实话,我刚接触到这三个概念的时候心里是抵触的,因为Heron里没有这样的东西,我踏入了知识盲区。
于是我非(囫)常(囵)耐(吞)心(枣)的把这本书里关于这三个概念的章节读了一遍,又读了官方文档上Mobile Gaming这个相对复杂的tutorial(这个tutorial很值得一读),对这三个概念大致有了了解。我这里就不赘述了,因为赘述不好,所以我还是举一个例子好了。
1.3.1 看个栗子
实践中,我找到了一个现有的use case可以用到window API。这个use case简单来说是酱婶的:
- 我有一个job,这个job不停地从Message Queue里读数据,这些数据的key是一个个string
- 对每一个key,如果是2小时内第一次出现,那就给这个key创建一个2小时的window
- Count这个key在这2小时里出现的次数,一旦次数超过了一个阈值,就输出一个(key,true)的tuple,如果还没超过阈值,则输出(key,false)tuple
- 2小时后,这个key的window自动关闭
举个例子来说,如果这个阈值是2,那对应的input和output是这样的
这怎么看都是一个标准的Session Windowing use case嘛!我翻开书,开始抄代码,但是抄着抄着,笑容逐渐消失。
首先Beam的window一般是给你用来做aggregation的,一般来说,1个window结束,输出1条结果。上述的这个use case里呢,window即用来做aggregation(count key出现过的次数),也需要不断对每个key到底是true还是false做判断,并且输出。有几条input,就得有几条output,而且从收到一条input到输出一条output之间要保证实时性,不能等2小时window结束的时候,shua的一下把所有output一下子都倒出来。
要解决这个问题就要用到Trigger,trigger是用来决定什么时候一个window要具现化(materialize)一个output,用人话说,trigger就是用来控制window啥时候可以output。Beam提供了很多Trigger选择,这里我就用了withEarlyFirings(AfterPane.elementCountAtLeast(1)))。这个trigger的意思是说,这个window每遇到一个新的element,就output一次,这样这个window在没关闭前就可以不断输出,同时还能继续做aggregation。
另一个问题是,Beam自带的SessionWindow时间长度是不固定的,而我需要的是一个定长的session window。要解决这个问题,就只能自己重写一个window了。。。这是我写的粗糙的版本,为什么用scala呢?这个不要问我,我也不喜欢scala,用scala去搞Beam这种Java的library听起来简单,其实很造作。。。
import java.util import org.apache.beam.sdk.coders.Coder import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, WindowFn, WindowMappingFn} import org.joda.time.Duration import scala.collection.JavaConverters._ class MergeCandidate[T](var union: IntervalWindow = null) { var parts: Seq[IntervalWindow] = if (union == null) { Seq.empty } else { Seq(union) } def intersects(window: IntervalWindow): Boolean = union == null || union.intersects(window) def add(window: IntervalWindow, maxSize: Duration): Unit = { union = if (union == null) { window } else { // Make sure the union window is always // the earliest start time of all its parts + the max duration val earlierStart = if (union.start().isBefore(window.start())) { union.start() } else { window.start() } new IntervalWindow(earlierStart, maxSize) } parts = parts :+ window } def apply(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { if (parts.size > 1) { c.merge(parts.asJava, union) } } } /** * A Fixed Size session in terms of duration * @param maxSize the max duration a session can last * @tparam T */ class FixedSizeSession[T](maxSize: Duration) extends WindowFn[T, IntervalWindow] { def windowSize(window: IntervalWindow): Duration = if (window == null) { new Duration(0) } else { new Duration(window.start(), window.end