MillWheel学习及持久化流式计算总结

MillWheel 是Google 2013年公开的其内部主要的流式计算平台,其与自己所在公司研发和使用的流式计算平台(TM)在设计本质上有相似之处,这里就借助对论文的解析,边阐述MillWheel的设计原理和亮点,边总结一下在设计上取“持久化”模式流式计算的技术特点。

就自己对流式计算的学习和总结,流式计算从设计本质上大致分为两类:从批处理演化而来的渐进式流式计算及另起炉灶的纯流式计算,前者多通过持久化完成容错,后者多通过事务策略完成容错。而本文主要针对前者展开,后者后续以Twitter的Heron平台为对象另写文章。

简介开篇提出流式计算在Google业务场景中的需求:容错、持久化及扩展性。容错需求是因为分布式系统存在着天然的不稳定性因素(机器节点宕机、网络故障等);持久化需求是因为在Google业务场景中,需要使用历史数据或历史数据模型;扩展性则因为业务或流量变化是可预期的,因此计算系统需要原生地支持扩展。另外,还有计算模型高度抽象需求,以减轻业务逻辑开发人员对流式计算模型的学习成本(如MapReduce模型达成的推广效果);计算系统对多语言的支持力度,以降低开发人员的使用成本。

在开篇论文便一直在强调一个概念,幂等性(idempotent),即MillWheel计算框架可以向业务逻辑开发人员提供其对流式计算中数据操作的幂等性接口。具体为,业务用户在进行计算编码时,无需关心这份数据是否会重复计算、这个数据发送出去是否会送不到目的地之类的可靠性问题。这些问题,会由MillWheel框架统一接管并处理。这里插入一个网上对幂等性的一个定义:

幂等性是系统接口对外的一种承诺(而不是具体实现),承诺只要调用接口成功,那多次该调用对系统的影响是一致的。声明为幂等的接口会认为外部调用失败是常态,并且失败后必然会有尝试。

从论文的系统整体概述一节中,提出MillWheel将流式中的数据抽象为一个个的三元组(key,timestamp,value)。由于分布式流式计算首先要解决的就是分布式计算,即完成数据的分割与并行计算,而key就是完成分割的依据;流式计算不像MR模型一样可以对待处理数据有全局理解,其只能在接触到某个动态的时间段内的数据,因此必然需要理解数据的时间相关属性,即timestamp字段的意义所在(后面的Low watermark设计就可以证明这点);而value就是数据域了,采用了如同BigTable中数据域不作任何解释一样的设计。

三元组组成的数据流在用户定义好的DAG中流动,其中DAG可进行在线编辑,即动态地调整计算拓扑,同时可自主进行计算组件负载均衡的调节(计算节点的Split与Merge)。个人认为:这两点是MillWhell的绝活之一,可惜论文没有细说其实现原理。后面会通过个人对流式的理解尝试还原一下其实现原理。

DAG的计算节点由框架部分与业务用户两部分组成,其中框架部分提供对数据处理的幂等性,用户部分只需要完成其关心的业务逻辑(像容错、数据传递保证、计算状态一致性都由框架部分完成且对用户部分透明)。在这里论文提前交待了幂等性的实现关键所在:Checkpoint机制与Exactly Once语义保证,即每个处理的三元组都可由框架自动完成Checkpoint(不丢)且保证一次传递(不重)。

在论文的关键设计概念一节中,选取了计算节点、Keys、流、持久化状态、低水位(low watermark)、定时器、API七个概念,其中重点介绍了低水位设计与API设计。

计算节点:前面说了计算,即业务用户的业务逻辑只需关心其用户逻辑,MillWheel框架会在数据来到或某类定时器(下面会提到)触发时调用用户的处理逻辑。这里论文着重强调了一下幂等性的范围,即框架只负责完成框架提供API的幂等性,至于用户代码再去与外部系统(Rdis、Mysql等带状态系统)进行交互,那其幂等性需要由用户来保证。

Keys:作为分割数据流的依据,由用户提供从数据中提取Key的计算逻辑。个人猜测,这段逻辑MillWheel会将其安装在流的发布端,而非流的订阅端。即在A若有多个订阅者,那么这些订阅者需要提供自己的Key提取方法,然后A在向订阅者们发布数据时调用对应的Key提取策略,从而完成相应的数据分割。这样做,而不是在流订阅端进行一次Key提取,然后再进行一次发布,显然节约N次(订阅者数量)数据传递消耗。代价就是发布端的处理负载与订阅者数据呈线性相关。

:在MillWheel中,流是可以随用户的拓扑配置随意流动,当然符合业务需求是前提。一句话, 计算模型灵活,对业务支撑面广。

持久化状态:论文中是状态域与每个三元组对应,存储在一行中(BigTable,Spanner中的行概念),也是不作具体解释的字符串序列,由框架(定时器等框架状态)或业务方(聚合数等业务方状态)自行解释。因为BigTable与Spanner支持行级事务,所以这行做能保证三元组与状态之间的一致性。多说一句:MillWheel计算模型之所以如此简洁和强大,低层强大的分布式表格文件系统至关重要。

低水位(low watermark):由于流式计算中数据流动的固有属性,就永远有个潜在的问题:截止到某个时间点前的数据已经全部到位还是有部分滞留在上游。MillWheel通过框架级设计优雅地解决这两个问题界定,即低水位设计,每个计算组件有其自己的低水位值,论文中定义为:

min (oldest work of A , low watermark of C : C outputs to A )

可以看出,节点的低水位值不仅与自己知道的最旧数据有关,而且与其所有上游的低水位有关,而且取其最小值(最旧)。直观上解释,即只要上游还有更旧的数据暂留,就会通过低水位机制告诉下游,以让其等待旧数据到来再移动低水位。通过递归定义,整个拓扑的源头(Injector)决定了整个计算流的低水位最低处,即由其来对整个数据流兜底。确保当外部数据都已经灌入系统(Injector得到下游的Ack),再向前推其低水位值,以将“截止到此时间点前的数据都已经发送到系统中”这一消息“传播”出去。后面会阐述MillWheel如何“传播”这一消息。

定时器:MillWheel提供了两种框架原生定时器,一个是以low watermark作为触发源,一种是以正常时钟作为触发源。且定时器都是三元组粒度的,即每个三元组都可设置自己的定时器。业务用户可根据自己的业务需求二者选一或不选。其中若将low watermark作为触发源,一旦某些三元组的timestamp与low watermark相一致时,即调用用户对应的处理逻辑计算这些三元组。比如聚合操作,可认为这一时刻的数据都已到位,可以进行聚合并产生新的三元组然后下发。而以正常时钟作为触发源,那就不关心数据流是否达到,一旦定时器时间match了当前时间,即触发相关用户逻辑处理数据。

API:MillWheel的API非常之简洁,总共有5个,分别是

  1. void ProcessRecord(Record data)  即数据从上游达到后触发
  2. void ProcessTimer(Timer timer)  即定时器触发
  3. void SetTimer(string tag, int time) 设置定时器
  4. void ProduceRecord(Record data, string stream)  发送数据到下游
  5. StateType MutablePrsistentState() 提取持久化数据

即如同TM一样,用户逻辑被封装在框架之下,由框架完成调用。用户无需要关心异常处理、持久化操作以及上面讲到的low watermark的计算也是由框架完成,用户代码只需要完成三元组中的timestamp域的填充即可。后面会通过论文中的Zeigeist来阐明MillWheel中用户如何通过上述API实现自己业务逻辑。

下面开始流式计算系统中最关键的设计-容错设计虽然在用户代码里没有对系统的幂等性要求,那是因为上述API完成了对MillWheel系统的幂等性要求。首先,我们来看上述API如何完成幂等性的第一个组成部分:不丢不重,即对任何一个三元组,框架可以保证有且仅处理一次。

MillWheel的处理流程为:

  1. 检查到来的每一个三元组,以确认其是否已处理过(通过Bloom Filter完成)。若已处理,丢弃;否则,转2 - 不重处理
  2. 运行用户代码,完成用户的各种可能API调用:设置定时器、状态修改及数据产出
  3. 其中,上面的定时器设置、状态修改及数据产出 写后端存储系统
  4. 在完成后端存储系统写入后,向对应上游发送ACK - 不丢处理
  5. 将计算处理完的数据发送给下游(可能立即发送,可能等待定时器触发后再计算发送)

论文提到,为了提高处理性能,上述操作会以多个三元组为一个批次进行checkpoint。(因为存储系统交互开销是巨大的)。由上可以看到,MillWheel结合了ACK与持久化两种机制。

由于分布式系统的不稳定性,上述操作的任何一步都有可能因各种问题而终止。我们看MillWheel具体是如何完成容错-不丢不重。

  • 第1步出问题,因为下游并未产生真正的计算及任何状态修改,因此不影响Exactly Once语义;
  • 第2步出问题时,由于还未将三元组持久化到存储中,因此当新的Worker启动后,会再次接收到上游发送来对应数据,且通过第1步的重复检查;(只是存在了一部分重复计算,消耗了计算资源)
  • 第3步出问题时,由于MillWheel的写操作的原子性,若写没成功,则退化为第2步出问题;若写成功后出问题,那么当启动新的Worker启动后,其恢复通过持久化的状态,恢复状态,由于此时该三元组已经写入存储系统,因此当上游再次发送数据来时,在第一步检查失败且同时完成第4步的ACK以防止其反复重试
  • 同样,后面的第4、5步出问题都会由于恢复持久化的状态,以完成后续未完成的步骤。

下面来说一下,MillWheel如何进行全系统的重复三元组检查。首先在产生一个三元组时,系统会为其生成一个全局唯一的ID,其后续就携带此ID在各个Worker之间游走直至其被完全Drop掉(由此可以看到三元组在系统内部还被封装了一层)。因此,谁接收到它后在进行持久化时都会将该ID持久化到存储中。与此同时,每个计算Worker都有一个Bloom Filter(本地内存中)来核实一个三元组是否已经被接收、计算且完成持久化了。(当然Bloom Filter只能验证该三元组绝对没有被接收过,若验证失败,只能通过扫描存储完成最后验证)。对于某个Worker,一旦其下游都完成对某个三元组的Ack,那么其就会将其在Bloom Filter中的位清除,以确保Filter的可持续使用。

向下游发送产出数据前,先将产出数据进行Checkpoint,MillWheel称之为Strong prodution。其中一旦产出数据发送成功后(收到下游发送来的Ack),Checkpoint即可删除,这其中论文提到使用了BigTable的blind write技术以提高Checkpoint性能。这个技术与LSM-Tree有关,可快速实现随机写(B+树)与顺序读。而产出数据由于其无序性,采用这种技术可以明显加快Chekcpoint的速度。至于具体如何Checkpoint,如何replay论文并没有给出详细解释。我想应该是按照key与timestam写入BigTable,然后数据索引信息再持久化到Worker对应的状态表中,而且这几个操作还需要是原子的。

但对于有些业务场景,并不需要不重或不丢的要求。因此上述实现带来的系统开销也就没有必要。MillWheel对Exactly Once与Strong prodution提供了选择开关。通过关闭第1步的重复性检查即可关闭Exactly Once语义;而无需将产出数据进行Checkpoint就乐观地发送给下游,即关闭Strong prodution语义。但论文中提到,一旦关闭Strong prodution后,会出现由于一旦中间一级在未收到下游的Ack而没有向上游Ack时,此就在此时crash后重启,数据会重新走一遍。这样对系统开销太大,因此MillWheel想出一个折衷的方法,即在中间Worker发送出数据后超过一定时间没有收到下游Ack,这时将产出数据进行Checkpoint并立即向上游发送Ack。以此来减少资源消耗,同时还降低的数据处理的延迟。

状态管理,即状态一致化控制,前面已经提到通过BigTable的行级事务完成了三元组相关状态的持久化。但对于在正常与异常迁移过程中出现的野worker及残留在网络中的非法请求,则通过sequencer token完成状态一致性控制。即通过授予合法worker一个token,在每次写状态前都检索该token是否合法,那些野worker或残留请求由于持有旧的token,其写请求会被拒绝。

下面论文就MillWheel的具体实现说了一些大概的情况,遗憾的是没有具体细节,可以说这个论文只介绍了一种计算模型,具体到框架实现并没有深入展现。

MillWheel的数据流传递是通过RPC完成(可见其已经完全离开了类似TM的初始设计);由replicated master完成计算节点的负载均衡,其搜集来自独立的监控计算节点的负载信息,对计算节点进行Split与Merge操作,将三元组中key域范围进行划分,分配给不同的计算节点进行计算。并再次提到持久化的状态通过BigTable完成存储。

低水位的实现采用了中心化设计,即由一个控制节点完成低水位值的收集和分发。但具体的低水值的计算是由各个计算worker计算完成,然后上报给中心控制节点。同样,低水位值作为一种状态也会持久化到存储系统中。这个与TM Server的设计,有几份相似之处,即所有的计算节点通过一个中心控制单元完成控制数据的交换。同样,由于低水位的更新是计算节点的粒度,因此考虑到僵尸节点或网络中的残余请求,低水位的更新也采取了sequncer机制来确保请求更新的为正常计算节点。

正常流程:计算节点从Central authorty(论文定义,即低水位中心分发中心)获取到上游的低水位值,然后根据自已记录的当前最“oldes"三元组的timestamp,计算出自己的低水位,然后将该值提交到Central authorty。其收到更新后,会将该最新低水位值向订阅该计算节点的下游节点分发,以驱动其完成自身低水位值的更新。

性能分析

论文中提到了Worker级别的缓存系统,对于提高CPU利用率效果明显。由于其通过动态负载均衡实现稳定性,MillWheel对于有数据倾斜的数据源,无法招架,会带来热点数据压跨某个计算节点。

TODO:

设计分析

MillWheel的 ARK + 持久化 设计与TM的 Update + 持久化 可谓异曲同工,TM通过将所有Worker之间数据传递的确认(ARK)转化为都同一个中心控制单元完成(当然其需要确保这些确认状态不丢-具体为流水+Checkpoint保证)。前者将确认分散到各个计算节点完成,后者采取集中控制完成。因此前者的在线DAG编辑与自主拆分、合并计算单元设计就比较好实现;后者要实现相应设计就很难,或者说中心控制单元设计复杂度与运行负载会加大,后者设计将状态集中在一处,即便有流水+Checkpoint机制保证,但始终摆脱不了单点所带来的各种负面影响(需热备、计算瓶颈)。

MillWheel的三元组中的Key域,不仅完成了业务上的分类与聚合处理需求,同时由于所有使用MillWheel框架的业务用户,都需要提供其数据的key域,同时为框架对各个计算根据其计算负载进行动态自主地Split与Merge提供了最基础的数据支撑。

本论文中Strong prodution、低水位的中心化、及后面提到的其checkpoint与简单的backup不同这三个问题还有些模糊。需要后续再仔细阅读相关论文完成。

还需要完成MillWheel的拓展性或者说负载均衡机制的尝试性还原工作。

MillWheel的自动负载均衡处理与KEY域有密切的关系。前文提到,理论上,KEY的提取不应该放在订阅方处,至少应该放在发布方。假设订阅方与发布中间无任何组件,直接进行RPC交互,因此,replicated master在搜集完成负载信息并决定进行负载均衡处理时,至少需要做两件事:调整发布方;调整订阅方-计算节点的数量。这里就有一个严重同步问题,合理的方式是两个调整应该同步完成才实现数据流的正常流动。但实际上这在单机中都需要通过修改+提交来完成,但在分布式系统中这种复杂逻辑更难实现。因此,猜想MillWheel在发布方与订阅方之间有一个缓存计算组件(KEY的计算),发布方将数据发送到该组件,该组件根据拓扑进行分发(根据Key)到下游的订阅方。如此拓扑信息集中到一处,这样replicated master在进行负载均衡时,只需要调整这个中间组件即可完成流的切割与组合。由于上下游的Worker已经完成持久化,因此该组件无需再做任何持久化与异常处理。上面描述的消息队列设计是一种,但添加组件后给系统的稳定性带来影响。也可以在发布端完成对下游流的分隔,Replicated master在Merge与Split不同,其操作顺序不同来完成同步。如当Merge下游Worker时,可先通知上游进行Key的Merge,然后再关闭下游无Key分配的Worker;相反,当Split时,需要先通知下游完成新Worker的启动,再通知上游Worker完成Key的分配。

要结合Zeigeist,来阐述一下MillWheel的业务用户的API使用。

 

 

 

发表评论