一般来讲,绝大多数互联网服务都可以划分为 OLTP (On-line Transaction Processing) 与 OLAP (On-line Analytical Processing)。OLTP vs. OLAP 这篇文章对两者之间的差异给出了非常清晰的解释。如果我们只考虑 Query 这个维度,OLTP 最好只处理相当简单、耗时极短的查询或操作,而 OLAP 则处理比较复杂、耗时较长的查询(通常包含数据聚合)。
LeanCloud 提供的离线数据分析服务,大致上可以归类为 OLAP。用户提交 SQL 查询语句,我们的系统为其完成查询并返回相应结果。如果用户提交的 SQL 较为简单,系统处理时间就比较短。反过来,如果用户提交的 SQL 较为复杂(多表 join、数据聚合),那么系统处理时间会更长一些。既然如此,我们就不能像 OLTP 那样在一次请求里面给出查询结果,而是将查询拆分成「创建分析 job」与「获取分析 job 处理结果」。简言之,从 API 层面看,用户的查询是由两个接口来实现的,即 POST /jobs
与 GET /jobs/:jobId
。这样的 API 设计意味着所有的数据分析任务都会异步地处理。
从系统模块划分,LeanCloud 离线数据分析服务由 Master 与若干 Worker 构成。Master 与 Worker 之间通过分布式消息队列(Kestrel)进行通信。
Master
Master 负责处理 HTTP 请求。如果 Master 收到的是创建数据分析 job 的请求,它会将该请求进行过滤封装,然后将其转发给后面的 Worker。如果 Master 收到的是获取数据分析 job 处理结果的请求,它则从缓存(Memcached)或持久化组件获取结果。
Worker
Worker 负责消费来自分布式消息队列的消息。Worker 根据消息内容处理数据分析任务。我们的 Worker 会把更具体的查询分析交由 Spark SQL 来处理。
下面介绍离线数据分析服务核心组件。
实际上,我们的系统并非只有一种 Worker。根据具体任务的差异,我们既有负责数据分析的 QueryWorker,也有负责数据转换的 DataTransformationWorker。这些 Worker 根据自己的角色去订阅(消费)不同的消息队列,进而处理自己该处理的任务。此外,Worker 的数量也不固定,它主要取决于整个系统的负载。如果消息队列出现消息积压,那么就应该检查是否有 Worker 停止工作或者是否需要增加 Worker。
当 Worker 处理相关任务时发生某些意外故障(例如 Spark 集群意外宕机),该任务可以自动被重新提交进消息队列,由其他 Worker 接管并对其进行重新处理。这样的机制有助于提高系统的鲁棒性,确保任何任务都能足够的机会得到妥善处理。
另一方面,如果我们要处理其他类型的任务,或者希望有不同类型的 Worker 加入进来,整个系统的架构也可以不做特殊调整而得以支持新型任务。引入新 topic 队列,新的 Worker 消费该队列即可完成相关任务,而原有的 Worker 并不需要为此做任何改变。
总的来讲,「异步处理」既能保证系统的容错性,又能为我们的系统提供良好的水平扩展能力。这也是我们选择这种架构的根本目的。