Apache Beam和Google Dataflow实用体验和踩过的坑


原文链接:https://zhuanlan.zhihu.com/p/106676174

预计阅读时间10~30分钟。难度中等

本文基于我过去半年对 Google Dataflow的潜(盲)心(人)学(摸)习(象),和实(瞎)战(几)体(把)验(用)。在 Storm和 Heron的知识体系以及使用习惯下,完全根据个人经验所写的感悟。本文没有故意抬高或者批判任何 project的意图,只是想单纯分享使用经验+吐几个槽。如果有理解和使用上的错误或误区,欢迎一起讨论

0. 背景

几年前我司个别领导拍了一下脑袋:Heron在宣传上没有花心思做好,导致现在很多人甚至都没听说过Heron这玩意儿,在Apache孵化了那么久也还没毕业,没有像Flink那样搞成流处理霸权,当年开发Heron的黄金一代也都走得差不多了,如今我司和GCP又签了那么大一个合同,不如。。。我们就把流处理的东西都挪到GCP上去,再琢磨琢磨搞一套混合云解决方案,这impact谁敢说小,这不就好起来了嘛。。。

就这样,在大约半年前,我开始了对Google Dataflow的潜(盲)心(人)学(摸)习(象),和实(瞎)战(几)体(把)验(用),不计成本地在GCP上做各种实验。

Dataflow是Google Cloud的一款拳头产品。Dataflow上提供了对数据批处理 (Batch Processing) 和流处理 (Streaming Processing) 的能力。简单的来说,可以把Dataflow想象成跑在云上的一个Hadoop+Storm共同体。

Apache Beam是一套用来编写数据处理任务的API,Beam旨在提供一种统一的API来编写批处理和流处理Job,也就是说可以实现同一套代码,既可以用来做批处理,也可以用来做流处理。需要注意的是,Beam本身并不处理数据,而是用来描述一个数据处理任务应该如何进行的,真正运行处理的是Runner,现在比较常见的,并且支持Beam的Runner有Google Dataflow,Flink,Spark,Heron(手动狗头)等。

1. Apache Beam

我们先来说说Beam。

Beam的programming model和传统的Storm/Heron的那套Spout/Bolt API很不一样。传统的Storm/Heron里,所有的业务逻辑都抽象成一个Method,比如Spout里,用户只要最后提供nextTuple这一个method,来包含所有关于从数据源拿数据,反序列化,解析,以及往下游发送的一系列逻辑

// Storm/Heron Spout nextTuple Example public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }

又比如Bolt里,execute这一个method,包含了所有关于如何处理每一条上游来的数据的业务逻辑。

// Storm/Heron Bolt execute Example @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }

当然,有一定规模的Storm/Heron job里肯定不会把几百几千行业务逻辑都写在一个method里,用户会自己去模块化他们的代码。

与之相比,Beam的API把业务逻辑拆分成了一个个更小的逻辑步骤。抽象的接口也更多。

// Example Beam API pipeline.apply(TextIO.read().from(options.getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.getOutput()));

Beam API里每一个 ".apply()" 都是在对数据集做一步transform。看着这些FlatMapElements.intoFilter.byvia,作为一个被Storm和Heron那套API领进门的孩子,突然面对那么多抽象我有点无所适从,我在写一个job的时候到底要用哪些接口呢?

1.1 ParDo & DoFn

于是我非(囫)常(囵)耐(吞)心(枣)的把这些抽象的文档读了一遍,发现在这些抽象中隐藏着一个大杀器,那就是ParDo!官方文档上是这么写的:

ParDois a Beam transform for generic parallel processing.

ParDo可以用来表达几乎所有上述的接口想要做到的事,仔细一品,这个ParDo是不是就有那么一点像Storm/Heron里的Bolt呢?

ParDo和DoFn是结合在一起用的,具体的处理逻辑写在DoFn这个interface里,一个简单的example长这样:

PCollection<Integer> wordLengths = words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.getElement().length()); } }));

这么一看这个DoFn的processElement就更像Storm/Heron里Bolt的execute了。说实话,自从了解了ParDo之后,我几乎再也没用过什么MapElements.intoFilter.by这些花里胡哨的东西,有一种屠龙在手,天下我有的赶脚 (其实如果用那些,代码可能会更短更好看,所以不要学习我)

我们组有个头铁的大兄弟,他是这样吐槽的:“为什么Beam要用ParDo这么奇葩的名字呢?为什么不直接叫他FlatMap呢?” 我觉得这是个好嘈点。

ParDo=Parallel Do。

关于ParDo还有一个地方可以吐槽一下的是他的branching outputs(对应下图中蓝色框框)。如果你有一个transform,你想让他的输出,根据不同的条件对应到下游不同的transform step的话,就需要用到Tag这个东西,给每组output打上一个标签,这样再在下游的transform里写明我要读的是哪一个标签的input。

Tag的用法我给你们个例子,你们体会一下:

final TupleTag<String> zhenhuan = new TupleTag<String>(){}; final TupleTag<String> wulanala = new TupleTag<String>(){}; final TupleTag<String> huafei = new TupleTag<String>(){}; PCollectionTuple results = words.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(...); c.output(wulanala, ...); c.output(huafei, ...); } }).withOutputTags(zhenhuan, TupleTagList.of(wulanala).and(huafei)));

在用tag的时候,Beam逼着你从这3个tag里挑选一个当main output tag,在调用c.output时,这个main output tag是不可以explicitly写明的,但是剩下的tag都需要明确标明,后面的withOutputTags也一样。

不知道这样的设计有什么深远的意义,我只知道我在写这些tag的时候,脑补了一出宫斗戏。。。我本想雨露均沾的给这3个tag同样的恩宠。。。可惜Beam不允许

1.2 Coder

我刚开始写Beam code的时候,在Coder这里跌了很多跟头。。。Coder是Beam里对不同数据类型进行编码的模块。对于每一个apply step(也就是每一步数据transform),Beam都会对这一步transform输出的data进行编码(encoding),这里就需要用到Coder。如果输出的type是primitive,或者是Beam原生的data type,那系统会自己找到对应的coder,不需要额外在code里写明。如果输出的type是自定义的,比如输出一个thrift object,那系统就会默认用一个general的SerializableCoder。这个coder和thrift,avro这类protocol object很不搭,不仅性能不好,而且还会间歇性地报serialization异常。这种情况下,你就需要自己写一个专门的coder,并且在job code里explicitly写明,这一步transform的输出请用这个coder来编码。

Coder在这里的意义在于,一个Beam job的不同step,可能会跑在不同的worker machine上,数据从一个worker跳到另一个worker的过程中,就需要进行encode/decode。

这里我想强调一个特殊的type,那就是Beam的KV。KV顾名思义就是Key-Value Pair,用来表示一对数据。如果一个transform的输出是一个KV,并且Key或者Value里有自定义的type, 比如KV<String, MyThriftObject>,那这个时候也需要explicitly写明coder

PCollection<KV<String, MyThriftObject>> kvs = pipeline.apply( ParDo.of(new DoFn<MyThriftObject, KV<String, MyThriftObject>>() { ... })).setCoder(KVCoder.of(StringUtf8Coder.of(), MyThriftObjectCoder.of()));

1.3 Windowing, Watermark, Trigger...

终于要说到Beam最闪耀的灯球了。Window,watermark,trigger,这三个概念是Beam最引以为豪的概念,也是这本书里花费几乎全部篇章去讲的几个概念。说实话,我刚接触到这三个概念的时候心里是抵触的,因为Heron里没有这样的东西,我踏入了知识盲区。

于是我非(囫)常(囵)耐(吞)心(枣)的把这本书里关于这三个概念的章节读了一遍,又读了官方文档上Mobile Gaming这个相对复杂的tutorial(这个tutorial很值得一读),对这三个概念大致有了了解。我这里就不赘述了,因为赘述不好,所以我还是举一个例子好了。

1.3.1 看个栗子

实践中,我找到了一个现有的use case可以用到window API。这个use case简单来说是酱婶的:

  1. 我有一个job,这个job不停地从Message Queue里读数据,这些数据的key是一个个string
  2. 对每一个key,如果是2小时内第一次出现,那就给这个key创建一个2小时的window
  3. Count这个key在这2小时里出现的次数,一旦次数超过了一个阈值,就输出一个(key,true)的tuple,如果还没超过阈值,则输出(key,false)tuple
  4. 2小时后,这个key的window自动关闭

举个例子来说,如果这个阈值是2,那对应的input和output是这样的

这怎么看都是一个标准的Session Windowing use case嘛!我翻开书,开始抄代码,但是抄着抄着,笑容逐渐消失。

首先Beam的window一般是给你用来做aggregation的,一般来说,1个window结束,输出1条结果。上述的这个use case里呢,window即用来做aggregation(count key出现过的次数),也需要不断对每个key到底是true还是false做判断,并且输出。有几条input,就得有几条output,而且从收到一条input到输出一条output之间要保证实时性,不能等2小时window结束的时候,shua的一下把所有output一下子都倒出来。

要解决这个问题就要用到Trigger,trigger是用来决定什么时候一个window要具现化(materialize)一个output,用人话说,trigger就是用来控制window啥时候可以output。Beam提供了很多Trigger选择,这里我就用了withEarlyFirings(AfterPane.elementCountAtLeast(1)))。这个trigger的意思是说,这个window每遇到一个新的element,就output一次,这样这个window在没关闭前就可以不断输出,同时还能继续做aggregation。

另一个问题是,Beam自带的SessionWindow时间长度是不固定的,而我需要的是一个定长的session window。要解决这个问题,就只能自己重写一个window了。。。这是我写的粗糙的版本,为什么用scala呢?这个不要问我,我也不喜欢scala,用scala去搞Beam这种Java的library听起来简单,其实很造作。。。

import java.util import org.apache.beam.sdk.coders.Coder import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, WindowFn, WindowMappingFn} import org.joda.time.Duration import scala.collection.JavaConverters._ class MergeCandidate[T](var union: IntervalWindow = null) { var parts: Seq[IntervalWindow] = if (union == null) { Seq.empty } else { Seq(union) } def intersects(window: IntervalWindow): Boolean = union == null || union.intersects(window) def add(window: IntervalWindow, maxSize: Duration): Unit = { union = if (union == null) { window } else { // Make sure the union window is always  // the earliest start time of all its parts + the max duration  val earlierStart = if (union.start().isBefore(window.start())) { union.start() } else { window.start() } new IntervalWindow(earlierStart, maxSize) } parts = parts :+ window } def apply(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { if (parts.size > 1) { c.merge(parts.asJava, union) } } } /**  * A Fixed Size session in terms of duration  * @param maxSize the max duration a session can last  * @tparam T  */ class FixedSizeSession[T](maxSize: Duration) extends WindowFn[T, IntervalWindow] { def windowSize(window: IntervalWindow): Duration = if (window == null) { new Duration(0) } else { new Duration(window.start(), window.end()) } override def assignWindows(c: WindowFn[T, IntervalWindow]#AssignContext) : util.Collection[IntervalWindow] = Seq(new IntervalWindow(c.timestamp(), maxSize)).asJava override def mergeWindows(c: WindowFn[T, IntervalWindow]#MergeContext): Unit = { val sortedWindows = c.windows().asScala.toSeq.sorted var merges = Seq.empty[MergeCandidate[T]] var current = new MergeCandidate[T]() sortedWindows.foreach { window => if (current.intersects(window)) { current.add(window, maxSize) } else { merges = merges :+ current current = new MergeCandidate[T](window) } } merges = merges :+ current merges.foreach(_.apply(c)) } override def isCompatible(other: WindowFn[_, _]): Boolean = true override def windowCoder(): Coder[IntervalWindow] = IntervalWindow.getCoder override def getDefaultWindowMappingFn: WindowMappingFn[IntervalWindow] = throw new UnsupportedOperationException("Sessions is not allowed in side inputs") }

写完这个新的window implementation之后,又发现一个问题,那就是我还得写一个自定义的Combiner,用来count一个window里,该key出现的次数,这么简单的一个aggregation function,愣是没找到Beam原生的实现,于是乎,我又撸了一个Combiner

import java.lang import org.apache.beam.sdk.transforms.Combine.CombineFn import scala.collection.JavaConverters._ case class CountAndValue[T](count: Long, value: T) case class CountAndValueAccumulator[T](var count: Long = 0L, var latest: Option[T] = None, var ts: Long = 0L) { def update(input: T): Unit = { ts = System.currentTimeMillis() count += 1 latest = Some(input) } } class CountAndValueCombiner[T] extends CombineFn[T, CountAndValueAccumulator[T], CountAndValue[T]] { override def createAccumulator(): CountAndValueAccumulator[T] = { new CountAndValueAccumulator[T] } override def addInput(mutableAccumulator: CountAndValueAccumulator[T], input: T): CountAndValueAccumulator[T] = { mutableAccumulator.update(input) mutableAccumulator } override def mergeAccumulators(accumulators: lang.Iterable[CountAndValueAccumulator[T]]) : CountAndValueAccumulator[T] = { var totalCount = 0L var latestTs = 0L var latestValue: Option[T] = None accumulators.asScala.foreach { accumulator => totalCount += accumulator.count if (accumulator.ts > latestTs) { latestTs = accumulator.ts latestValue = accumulator.latest } } new CountAndValueAccumulator[T](totalCount, latestValue, latestTs) } override def extractOutput(accumulator: CountAndValueAccumulator[T]): CountAndValue[T] = { CountAndValue(accumulator.count, accumulator.latest.get) } }

到这里,我终于能用上Beam这些高级的概念来写我的job啦!

inputs.apply("sessionWindow", Window.into[KV[String, String]]( new FixedSizeSession[KV[String, String]](Duration.standardHours(2)) ).accumulatingFiredPanes() .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.ZERO)) .apply(Combine.perKey(new CountAndValueCombiner[String]())) .apply("Output", ParDo.of({...})

然而花了好大力气写的这个实现,到了dataflow上表现并不好,之后的章节会具体细说。

1.4 Stateful API & Timer

上一个实现在dataflow上表现不好,性能并不高。经过一番对Beam官方文档的挖掘,我又找到了一个新的大杀器。。。那就是Stateful ParDo。ParDo是真的牛逼,绝对的C位,所有的大杀器都是围绕着ParDo展开的。

Stateful ParDo就是通过几个annotation和额外的method,把一个普通的ParDo变成一个stateful component,这样一个ParDo处理单元就能拥有可持续化的状态,以上面的例子来说,这个state就可以是每个key对应的出现次数。这些state会储存在对应runner里约定的存储里,比如在dataflow上,state是存在每个worker的硬盘上。

一旦一个ParDo变成了Stateful ParDo,数据从它的上游到这个Stateful ParDo之间也会自动多一步data shuffling的步骤,以确保相同的key总是落到同一个worker上。

Stateful ParDo还经常和Timer一起使用,timer可以用来控制什么时候对这个state做一些lifecycle management,比如清空state。

你细细品一下,有了这个stateful ParDo和Timer,似乎也可以完全不用window那套东西了。。。

ParDo是真的香!

2. Google Dataflow

下面来说说Dataflow,dataflow相对来说坑就多了。。。

2.1 Window Performance

上文说到,我辛苦写了一个定长的Session Windowing的job,拿到dataflow上去跑。不久就发现了一个现象:Job刚开始跑的前2小时,性能指标和业务逻辑指标一切正常。到了最初2小时结束的时间点,各项指标都开始明显变差,随着时间越来越久,各项指标变差的程度也越来越大。各种惨不忍睹。2小时又恰好是一个session window的固定时长。这肯定不是巧合。

Dataflow对用户来说几乎就是一个黑盒,暴露给你的关于出错的信息少的可怜。这时候就只能去求助Google了。好在我司有一支庞大的Google支持团队常年驻扎,在几个Google老司机的协助调查下发现,在最开始的2小时里,每一个unique的key都会对应创建一个新的window,所有的window都处于未关闭的状态(in-flight),这时候一切安好。

到了2小时这个时间点的时候,这些window都需要关闭,同时发生了太多window close的trigger,一下子给worker加了很多压力。这些瞬时的压力造成了巨大的资源占用,导致之后新一轮2小时的window们处理数据的吞吐量也变低,等新一轮的window也准备关闭的时候,又会产生新的瞬时压力。尴尬的是,这个时候,很可能第一轮2小时window关闭所带来的压力还没有被完全消化完,这就形成了一个恶性循环,性能也就呈雪崩式崩塌。

有趣的是,用Stateful ParDo写的实现并没有遇到这样的雪崩问题。你细细品,我实际上是用了Stateful ParDo实现了一个Session Window,然而却比真正用Beam Window API表现得要更好。

Google团队给我的答复是,你这个use case吧,就不适合用window那套骚操作,就stick with stateful ParDo吧。

2.2 Streaming Engine Mode Vs. Streaming Appliance Mode

当然,stateful ParDo的实现也不是完全没有问题的。我一度发现这个job无论怎么加CPU和内存资源,都没有使它的吞吐量达到我期待的水准,然鹅如果增加硬盘资源,反而能显著提高吞吐量。最后在这个文档里找到了解释。Dataflow上硬盘的吞吐量(IOPS)有一个上限,这个上限和你分配的硬盘大小正相关,硬盘分配的越多,容许的吞吐量上限也就越高。而硬盘吞吐量在dataflow里又和data shuffling息息相关。数据源输出数据的吞吐量大,对于这么一个stateful processing job,data shuffling的吞吐量也大,对于硬盘的吞吐量需求就高,对于硬盘大小的要求也就越高。所以即使这个job本身存的state并没有很大,不占用多少硬盘资源,也还是需要一个无比巨大的硬盘池来驱动它的正常运转。

说到data shuffling,就不得不说Google团队一直在鼓吹我用的一个experiment feature,名唤streaming engine。据Google团队的描述,在dataflow上,一个streaming job默认是以一个叫Streaming appliance的模式下运行,这个模式通俗点讲就是你负责花钱买了一大堆Google Compute Engine以及计算资源,Google负责把dataflow部署到你的这个GCE集群上,然后你就自己跑你的job了,你的job里的一切开销都由你分配的资源承担,这里最重要的一笔开销就是data shuffling的开销,就如我上面所说,在streaming appliance mode下,data shuffling消耗的是你自己的硬盘资源。

那么streaming engine mode是怎么一回事呢?就是你把data shuffling这部分任务交给Google管理的一个服务上去完成,不消耗你自己的GCE资源。如果选择streaming engine mode的话,你所需要分配的资源都可以大大的降低,尤其是硬盘资源。

这波安利听起来不错,我便将我的job换成了streaming engine mode。换完了之后,各项指标通通像喝醉了一般,上蹿下跳的,极其不稳定。整个今年一季度我都在和Google团队寻找streaming engine mode性能不稳的原因,至今未果。。。

2.3 Autoscaling

Dataflow有一个根据数据吞吐量,自动弹性伸缩的功能。在Dataflow的世界里,计算资源被单元化成一个个container,也就是传说中的Kubernetes container。在启动一个Dataflow job的时候,可以告诉他最少需要几个container,最多可以有几个container。就我所做的实验观察下来发现有这么2个问题:

  1. Job从启动成功,到开始做弹性伸缩,调整container数量,这之间平均需要15-30分钟左右冷启动的时间。这个时间我觉得有点略久
  2. 我从来没见过这个伸缩的“弹性”表现在哪里,基本上我所观察到的,永远是直接从配置好的最少container数量,直接跳转到配置好的最多container数量。中间没有一个平滑上升过程。

自动弹性伸缩这个点我就不深度吐槽了,因为我不是很care,本来就大概清楚一个job需要多少资源和container,也就不存在需要弹性伸缩来帮助我找到最佳的container数量。

2.4 Metrics, Stackdriver

Metrics这个点是个大坑。Beam里有一套定义的还不错的Metrics API。其中就包含最常见的Counter,Distribution,和Gauge。

  • Counter,顾名思义,是用来计数的一个metric
  • Distribution用来累加和统计一个分布,可以得到类似mean, median, 90/95/99 percentile 这类metrics
  • Gauge也是用来计数的一个metric。

Counter和Gauge的区别在于,counter的metric是一个时间序列,对于每个时间单位(每分钟,每小时)这个计数分别是多少。Gauge是一个all time aggregate,是一个绝对标量,不存在时间这个维度。

然鹅,Counter和Gauge这两个metrics到Dataflow里,就变成了一个意思。。。无论你Beam job里用的是哪个Metric,到了Stackdriver(GCP上的Metrics reporter)里,这两个都会变成Gauge。也就是说,你想计数每小时处理了多少数据,生成一个时间序列图表啥的,是做不了的。。。这就很不能忍了。Google团队说这是一个bug,

Stackdriver这个metrics产品,做的也是差强人意。

我们最后只能另辟蹊径,整合了一部分我司自己的metrics API到Beam里,并且把metrics数据千里迢迢导回到我司自己的metrics reporter里。所以这个Bug有没有被修复,我也没再去follow。

2.5 Job Dashboard

Dataflow的Job Dashboard做的乍一看很Google,仔细品就能发现很粗糙,而且充满歧义。

比如上图这个dashboard,红色箭头所指的那些数字,明显能看出是某种吞吐量,我第一反应是这个吞吐量代表这个transform step的输出吞吐量,可经过多次实验,发现这些数字代表的是当前transform的输入吞吐量。。。因此中间两个step的这两个数字是一毛一样的。

那么蓝色箭头所指的又是什么鬼呢?从官方文档上的只言片语描述,这些代表当前transform的lag,lag是怎么计算的呢?这就不知道了。。。还有右边绿色框框框起来的这两张表,又是什么鬼呢?也不明所以。更夸张的是,这几个代表lag的数字和图表,其实是不准的。我司在Dataflow跌爬滚打多年的前辈和我说这几个metrics他们从来不看。。。要测lag,我们用自己在业务逻辑里定义好的lag metrics就行了。

好在Google团队允许我使用他们新一代的Job Dashboard。

 

新的dashboard至少新增了输出吞吐量,CPU使用率,和错误量这几个还算有用的图表。比旧版的dashboard稍微有用了那么一丢丢。

2.7 成本

最后来聊聊Dataflow的成本,具体数字和对比不方便公布。有一说一,Dataflow毕竟是云托管的一项服务,可以帮一个公司节省许多人力和机房上难以计量的金钱成本和时间成本。所以你硬要比较Dataflow的成本,和自己在机房部署并管理一个Storm或者Heron集群的成本,很难做到很详细,精确,和公平。

Streaming appliance mode的计费方法算是良心的,就如我前文所说,你只需要支付你所分配的GCE资源即可,dataflow这个服务是不收取额外费用的,相当于白给。

Streaming engine mode相比appliance mode可以大幅减少你所花在GCE上的成本,但是相对应的,它会收取一份data shuffling service的费用,这笔费用是按照数据吞吐量来计算的。

3 总的来说

总的来说,Beam是一个挺有意思的project。文档还算完善。能看得出他想一统数据处理API的野心,也能看出设计者对流数据处理理解的境界,所以才能设计出我赘述不了的Window,trigger,watermark这些概念。同时Beam里又提供了ParDo这种可以用来解决一切疑难杂症的通用模块。Beam的坑不多,即使有些特性在dataflow里不支持,在flink里不支持,也都不是Beam的锅。但是有一说一,Beam的学习曲线还是算陡峭的。如果你像我一样被Storm/Heron一路带进门的话,不妨花点时间学习一下Beam,从另一个角度理解流数据处理的本质。

Dataflow是一个不断在听取用户反馈,bug report,进行改进的服务。从长远的角度我挺看好Dataflow的成长,毕竟背后有google这么一个爹的技术支持。在其他几家大的云服务商上也很难找到一个像Dataflow这么一个个性鲜明,自成体系的数据处理系统(有你也别告诉我,我用的公司经费只够用来玩Dataflow,不够玩其他的。。。)但是就目前的情况来看,Dataflow本身的核心(包括streaming engine mode,对Beam API部分特性的支持),还有Dataflow的周边服务(包括metrics,dashboard)需要提高和改进的地方还有很多。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM