更新一段我在linkedin上对这个项目的描述,目前项目已经开发完在使用了。本文并不是最新的设计。

背景
解决HDFS/Hive/RDBMS/FTP/MongoDB等数据源之间的批量数据同步问题


特性
跨机房场景下的链路优化;多路输入和输出的任务模型;数据容错和可持久化;任务失败恢复


任务调度
把任务配置解析为物理执行计划,Master控制任务的调度和失败恢复,基于Mesos完成资源分配和任务调度。Slave分布在各个数据中心,具体传输任务的调起做到链路优化选择。高并发场景下,增加Mesos Slave节点来保证可扩展性(CPU和MEM资源),Master将元数据记录在ZK上,并通过争抢ZK锁实现互备。


数据传输
传输组件分为Input、Cache和Output三种Executor,各自进程内通过双队列优化传输速度。数据以bundle为单位传输,通常上百行为一个bundle且可压缩,Netty作网络通信。Input端异步备份一份数据在BookKeeper,Cache使用Beanstalkd做消息队列,Output端处理Bundle成功或失败,会有守护线程异步删除或更新beanstalkd内的Message(类似Storm Topology里的Ack),Executors会把bundle传输状态更新在ZookKeeper上,某一Executor挂掉都可以在一台slave上重新调起并恢复任务继续进行。Input和Output端的Reader和Writer是插件化的。


====================================  我是更新线 ====================================


之前在最近分布式系统开发小结里,提到了一个在开发中的系统的大致设计,本文是我负责部分的一个详细设计。在阅读本文前可以先浏览下之前那篇文章,对于系统的功能和概况有个基本了解。


1. Slave总体设计


Slave模块主要需要实现不同的Mesos Executors,包括Input, MemoryStorage和Output三种Executor。每个Dpump任务会由Scheduler Manager经过逻辑执行计划和物理执行计划的拆分,从Knowledge Center获取知识,最终将切分后的Task分配给相应的Slaves执行,并通过Mesos Master,分配资源并调起Slave上的各自的Executor。三种Executors的执行逻辑图如下。


数据通过Bundle形式在三种Executor之间的流通,每个Bundle有唯一ID、一个String[]、以及一个Index。Index用于标记每个Bundle最后数据输出的最新成功行,即我们容错粒度控制在行级别。对Input、Cache、Output作一个简单介绍:
  •  Input,也叫Reader。每个Task内只有一个Input Executor,负责从数据源(HDFS、FTP、MySQL、MongoDB等)读出数据,将数据经过切分、处理、压缩后通过Netty流式传输给MemoryStorage。

  • Cache,也叫MemoryStorage。每个Task内只有一个Cache Executor,负责从Input端接收Bundle,将Bundle存取往一个队列内,当有Output连接的时候,将Bundle取出输送给Output

  • Output,也叫Writer。每个Task可能有多个Output Executor,负责将数据最终输出到数据目的源。Output从Cache端得到Bundle的过程也是流式的。

整个Task的流通都是流式的,且Slave之间的网络通信使用的是Netty这个NIO框架,传输过程中还涉及到Bundle高效的正反序列化和压缩、解压缩。最重要的一点是Input、Cache、Output三个部分各自都有容错设计,其中Input和Output通过向Zookeeper记录和获取Bundle状态保证处理Bundle的不重不漏,而Cache通过对队列内消息内容的钝化,保证自身已保存的Bundle不丢失,并能在新的Cache Executor起来后,可以继续为Output提供Bundle输出。


2. Slave 详细设计

下面详细介绍三种Executor的设计,阅读过程中请参考这张Task进程图。



2.1 Input设计


2.1.1 数据流通

每个Input负责一次Job(每个Job对应多个Tasks)内最小粒度的文件块读取,比如可能是一个HDFS Block,一张Hive表的一个分区甚至是一张MySQL表。

Input内还分有Writer、Buffer(双队列)和Reader。Writer是一个单线程,从数据源获取数据并切分好Bundle,每个Bundle有唯一ID和定长的字符串数组,然后将Bundle存入双队列的输入头,在双队列的读出头有若干个Reader线程抢占Bundle,每个Reader获取到Bundle后释放锁并做二次处理、压缩,最终Reader通过Netty Client将Bundle包装成一个传输格式,以二进制流的方式通过Channel流向Cache。


2.1.2 容错

Writer端切分Bundle保证了从同一个数据源的同份文件块读取数据生成Bundle是有序的,每次Netty往Channel里写入一份Bundle的时候,会通过Companion线程异步更新此Task下znode内的Bitmap,该Bitmap标记每个Bundle在Input端是否被传输。每次Input启动的时候,Netty会读取znode上的Bitmap缓存在内存里,发送Bundle前根据id作一次校对。所以当Input挂掉或重启时,可以保证发送给Cache的Bundle不重不漏。

2.2 Cache设计


2.2.1 消息队列

Cache本身是一个Netty Server,接收Input和Output多个Netty Client的连接,并对不同的Channel做不同Event处理。Cache Executor需要一个多状态的消息队列,这里采用的是Beanstalkd队列,下图为该Beanstalkd内消息(job)的状态变化图。



 每次Cache将新的Bundle put进Beanstalkd的时候需要选择一个tube(管道),Beanstalkd可以开启多个独立的tube,tube内存放jobs,每个job有自己唯一的job id,而job消息体就是我们的bundleBytes(Bundle存入Cache直接存的就是序列化后的byte[])。

每个job存入queue后是ready状态,被reserve之后,就不能被客户端再次获取到,即Cache每次会从每个tube里按顺序reserve一个job,并发送消息体给Output(一个output对应一个tube),这个过程保证每个job被消费一次,且只能被一个Output消费。如果Output端消费成功,则该job会被delete掉;如果该job消费失败,则会被重新置为ready,重新置为ready可能是因为超时(每个job被reserve的时候都有一个Time-To-Run时间设置)了,也可以是客户端release掉该job。

2.2.2 Acker设计

这里,对于tube内job的后续处理交给Acker这个线程来做。Acker的设计灵感来源于Storm。Storm Topology内每个bolt对tuple的执行和处理最终都会给Spout一个ack响应,而拓扑过程中整棵Tuple树的成功/失败执行状态会由Acker守护进程进行跟踪,以此来保证每个tuple被完全处理,而acker对tuple的跟踪算法是Storm的主要突破之一。

Cache端的Acker线程会监听zookeeper上znode树上各个节点的事件变化,从而掌握被Output消费的所有Bundle的最后状态,对应地删除、释放,或者更新Queue里的job。需要注意的是这里还涉及到一个更新job的过程。前面提到Bundle内维护了一个index,而Output消费bundle的时候,如果是数据行写了一半出现了异常或者挂掉了,我们需要记录bundle内数据行的最新index并将此信息也记录在znode上。对于这种最坏情况,Acker负责将该fail的job从queue里delete掉,并更改job内bundle bytes内容,重置新的index,再把新的job put进queue里。这是我们最不希望看到的情况,同时也是我们对Bundle能做的最细粒度的容错设计。

2.2.3 容错

Beanstalkd启动之后可以打开binlog开关,binlog是Beanstalkd容错恢复的机制,将内存里的消息队列结构映射到硬盘上。对于Cache的容错设计,直观的办法在于将这份binlog存在NFS或HDFS上,来保证Cache挂掉重启后,能获取到之前保存的Bundle数据,继续提供服务。

2.3 Output设计


Output在最终的Bundle消费阶段,会把数据导向新的数据源。每个Output获取的Bundle来自于Cache里的一个tube,而每个Bundle的执行情况也会由Companion线程异步更新到Zookeeper上。

 对于Output来说,它只需要关心从Cache端获取的每个Bundle都照常处理就可以了,不需要关心这个Bundle之前是否被消费过,被消费到哪里。原因在于,Cache端的job状态的变更和job的更新可以由Acker保障,而Acker是从zk上得到这些job的状态并对Queue异步更新。如果Acker挂了,只要重新起一个线程获取znode上最新的状态就可以了。对于Output来说,能传过来的Bundle,对应到queue里就是ready状态的job,这个job可能被消费过了,但是他的index也因此得到了更新,Output端对于所有Bundle的处理是一致的,唯一需要关心的是Output需要把Bundle的信息异步更新给zk,如果Output挂了,重新起一个Output接着从Cache读Bundle就可以了。


3. Slave模块总结

Slave模块三种Executor的设计,主要考虑的是各个Executor挂掉之后,怎样保证数据处理的不重复和不遗漏。我们依赖Zookeeper的可靠性,记录、更新、判断Bundle的状态,做到Input、Cache、Output各司其职,最到最小粒度的容错。Executor本身的失败和重启则由Mesos保障,Mesos作为资源管理系统,由Master监控Slave上各个Executor的执行状况,通过回调,可以在合适的Slave上再次启动挂掉的Executor进程,保证业务Task的顺利进行。



(全文完)


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐