一个轻量级服务编排框架的实现思路
服务编排(Orchestration)是在微服务架构中常用于聚合调用的低代码实现方案,通过对下游各个领域系统原子接口的排列组合,最终对客户端或前端提供可灵活适配业务的聚合接口,用以降低业务变化带来的技术成本。本文试图讨论一种有别于传统服务编排的技术方案,通过特别设计的运行规则来实现概念简单、配置灵活的服务编排框架,为后来者在设计上打开一定的思路。
前言:服务编排(Orchestration)是在微服务架构中常用于聚合调用的低代码实现方案,通过对下游各个领域系统原子接口的排列组合,最终对客户端或前端提供可灵活适配业务的聚合接口,用以降低业务变化带来的技术成本。本文试图讨论一种有别于传统服务编排的技术方案,通过特别设计的运行规则来实现概念简单、配置灵活的服务编排框架,为后来者在设计上打开一定的思路。
前言
微服务发展到现在,几乎成了有一定技术能力公司内新业务技术架构的首选。通过对业务的水平切割,让各个领域的服务能力独立发展,以增强灵活性和易扩展性。但对业务本身而言,切割并不应该是常态,在诸如营销活动领域,往往会根据运营人员天马行空的思路,贯穿多个领域来共同支撑活动举行。为了快速适配业务,服务编排(Orchestration)往往会被技术者们挂在嘴边,并设想出一个“只需要配置就能将已有能力组合成新能力”的技术愿景。本文不讨论这种愿景是否是一个合适的发展方向,只就服务编排本身而言,确实在快速拉起业务上具备一定的优势。
目前常见的服务编排框架,例如Netflix Conductor、Kstry,甚至拓展到其他低代码编排或编舞引擎,往往会设计极其复杂的概念模型。一旦编排的策略变成重要的业务能力,复杂的配置往往会带来工程化的难题。比如模拟了类似流程图的编辑能力,再稍复杂的配置场景中,配置者就需要理解“开始节点”、“条件网关”、“执行节点”、“结束节点”等诸多概念;如果要模拟并行执行,还会要求做出“入网关节点”(FORK)、“出网关节点”(JOIN)配置策略,在数学上明明只是左右括号的概念而已;有些编排引擎会提供用于扩展的高级功能,例如子流程嵌入,那么父子流程会各自包含独立而又可以相互影响的配置。
本文在笔者以在厂设计营销活动服务编排经历的基础上,受到了devops工作流(Workflow)的启发,希望能够在服务编排上“去繁就简”,以最小的概念设计去设计服务编排的功能,文章描述了一个较为完整的服务编排框架实现思路。
本文来自笔者在厂时内部说明资料的整理,由于篇幅和精力有限,不可能贴出全部代码并加以解释,并且删去了没有特点的部分,整理上可能有不太流畅的地方,望读者们理解。
名词术语
- Pipeline - 表示一个经过编排的、可独立请求执行的流程,一个Pipeline中会通过节点连线,将多个节点以串联或并联的形式编排在一起
- 节点(Node) - Pipeline中某个较为独立的步骤,通常由两个部分组成:1)进入节点的条件(默认true,表示一定会执行节点),2)该节点需要执行的动作
- 连线(Line) - 表示两个节点先后执行的关系,多个节点通过连线有顺序的编排起来,先执行的节点叫做上行节点(lastNode),后执行的节点叫做下行节点(nextNode)。整个pipeline中,只作为上行节点的节点称为起始节点(startNode),只作为下行节点的节点成为结束节点(endNode)
- 动作(Action) - 表示某个节点中需要执行的内容,通常包括三个部分组成:1)请求的服务和参数配置,2)执行判断,3)结果渲染
- 表达式(Exp) - 在编排节点、执行动作时需要进行条件判断或参数求值,在本次实现中使用的是QLExpress表达式
- 上下文(Context)- 在进行表达式求值时,表达式中的变量始终来源于上下文,而上下文数据的来源则包括用户请求、配置数据、服务请求结果等
流程编排
串行与并行编排
在笔者设计编排逻辑里只有节点和连线的概念,上行节点全部执行成功后,才会启动执行下行节点。没有上行节点的节点都是开始节点,没有下行节点的节点都是结束节点。在编排形式上,只设计了串行(Serial)和并行(Parallel)两种编排模式。
如果数据结构的角度来看,整个pipeline就是有向图结构模式,从实现后的运行效果来看,一个pipeline对于图连接要求不太严格,任意连线方式都可支持,但推荐还是按下图按步骤依次串行或并行配置,执行链路更加清晰,对编排者更加友好。
如上图,要表达先同时执行步骤11、步骤12,全部成功后再同时执行步骤21、步骤22、步骤23,最后执行步骤31,可按上述进行编排。如果某个节点执行出现问题,那么整个pipeline会提前以报错的形式结束执行,该种模式默认隐含了“如果执行出错或错误断言为真就立即退出”的逻辑。
分支条件的支持
上述描述中没有分支选择逻辑,那么这种情况下,如何支持需要条件执行的场景呢?我们可以把条件执行可以看作并行执行的一般场景,只是默认并行分支执行条件都是true。而条件执行下,只有满足某分支的条件时,并行分支才可以执行,而未满足条件的分支需要跳过执行,也因此节点数据里会包含条件字段。
如上图,步骤31和步骤32分别实现了互斥的条件,那么执行时Pipeline只会选择步骤31进行执行,而步骤32则会跳过。这里需要注意的是,上行节点的执行条件会影响下行节点的执行。上图中,虽然步骤42也满足条件,但由于其依赖的上行节点步骤32没有执行,也没有其他上行节点可以触发步骤42,因此步骤42无法执行。
这里也可以看出来,条件的执行是具备 与 / 或 逻辑的。在节点上配置的条件并不表示错误断言,只是表示连线上是否可以继续运行。而这个条件不仅可以并行配置,也可以串行配置。简单来说,串行条件表示“与”,并行条件表示“或”。
如上图所示,对于步骤11链路,只要步骤21和步骤22有一条条件得到满足,那么步骤31就可以执行;而步骤12链路,如果步骤23和步骤24有一个条件没有满足,那么步骤32无法执行。换言之,下行节点会沿用所有上行节点的条件。
空节点的作用
笔者设计的Pipeline允许出现空节点,即节点没有配置关联动作,在推进时会直接进入空节点的下行节点。这种空节点有时会有额外的作用:
-
只带条件的空节点,那么该节点就会变成流程图中的条件节点;
-
在某些并行步骤的后面通过加入一个空节点,可以汇聚执行链路,待整体执行完成后再继续进行推进;
3. 一个无作用的占位符,例如在不允许节点跳线,但又需要跳线的场景。
注:空节点可以起到特别的作用可能是笔者设计失败的产物,见后续小结 “遗憾:动作应该在节点上还是在连线上?”
节点的推进策略
设计好Pipeline的运行规则后,那么应当如何推进整个pipline的运行呢?
这里设计的推进策略的核心规则是:
当前节点尽力尝试推进执行下行节点,收集所有下行节点的推进结果,向上行节点反馈自身包括下行节点的推进结果。
如上图所示,步骤11执行后会推进执行步骤21和步骤22,但步骤21和步骤22在执行后由于彼此并行隔离,因此两个节点都会设法执行步骤31和步骤32,但实际上只希望它们执行一次,因此最终只会有一个节点(例如步骤22)拿到下行节点的执行权,从而推进流程继续执行。
分析来看,由于多节点存在并行执行的情况,因此每个节点执行后,都应当尽可能向后推进执行,但一个节点最多只能执行一次,因此看右图修剪后的pipeline在实际执行的情况应当是一棵由图进行“修剪”而得到的树。下行节点都可能是上行节点的子节点,但每个下行节点只能作为一个上行节点的子节点,因此当一个上行节点尝试推进执行下行节点时,需要判断该支路是否是否被“修剪”。如此来看,每一个节点的下行节点都将独立的形成一棵子树,同时该节点也会作为其上行节点的子树。每个节点执行后都会尝试推进子树的执行,每棵子树都从顶层节点开始执行,由顶层节点决定支路是否执行或被修剪。由此我们得出一个重要的结论:
当前节点会收集每棵子树的执行结果,结合自身执行结果,得到当前节点所在树的执行结果,并由其上行节点收集。那么起始节点所在树的结果就是整个Pipeline执行的结果。
实现方案
本文只粘贴部分关键代码,实现上并不复杂,只看文字描述即可。
具体实现上,由于pipeline的推进具备树型的计算结构,毫无疑问,类似Java的 Fork/Join 的框架是比较适合的实现方式,可以方便地进行并发线程的管理。
public class PipeRecursiveTask extends RecursiveTask<PipelineRunningState> {
private PipeTask pipeTask;
public PipeRecursiveTask(PipeTask pipeTask) {
this.pipeTask = pipeTask;
}
@Override
protected PipelineRunningState compute() {
return pipeTask.compute();
}
}
public class PipeTask {
// ... ...
protected PipelineRunningState compute() {
// 开始节点
PipelineRunningState runningState;
List<String> nextUniqueIds;
/* 执行节点,得到执行状态 */
if (StringUtils.isEmpty(nodeUniqueId)) {
runningState = new PipelineRunningState(pipelineContext.getPipelineUniqueId(),"");
runningState.setExecuteState(PipeStateEnum.SUCCESS);
runningState.setExecuted(false);
nextUniqueIds = pipelineContext.getPipeline().getStartNodeUniqueIdList();
} else {
runningState = nodeExecutor.apply(this);
nextUniqueIds = pipelineContext.getPipeline().getNextUniqueIdsMap().get(runningState.getNodeUniqueId());
}
runningState.setPipeState(runningState.getExecuteState());
if (runningState.getExecuteState().equals(PipeStateEnum.ERROR)) {
runningState.setErrorNodeUniqueId(nodeUniqueId);
}
/* 获取后向节点,如果没有则直接返回当前状态 */
if (CollectionUtils.isEmpty(nextUniqueIds)) {
runningState.setPipe("arriveEnd");
return runningState;
}
/* 节点不需要推进,则直接返回当前状态 */
if (! runningState.getExecuteState().isNeedContinue()) {
runningState.setPipe("discontinue");
return runningState;
}
/* 并行执行所有后向节点,透出推进状态 */
long nextStartTime = System.currentTimeMillis();
List<PipelineRunningState> nextRunningStateList = Lists.newArrayList();
if (nextUniqueIds.size() == 1) {
PipelineRunningState nextRunningState = new PipeTask(nextUniqueIds.get(0), nextStartTime, pipelineContext, nodeExecutor).compute();
nextRunningStateList.add(nextRunningState);
} else {
Map<String, PipeRecursiveTask> recursiveTaskMap = Maps.newHashMap();
for (String nodeUniqueId : nextUniqueIds) {
PipeTask nodeTask = new PipeTask(nodeUniqueId, nextStartTime, pipelineContext, nodeExecutor);
PipeRecursiveTask recursiveTask = new PipeRecursiveTask(nodeTask);
recursiveTaskMap.put(nodeUniqueId, recursiveTask);
recursiveTask.fork();
}
recursiveTaskMap.forEach((currentNodeUniqueId, recursiveTask) -> nextRunningStateList.add(recursiveTask.join()));
}
PipelineRunningState pipeState = calNextRunningState(runningState, nextRunningStateList);
runningState.setPipe("nextForward");
runningState.setPipeState(pipeState.getPipeState());
if (StringUtils.isNotEmpty(pipeState.getErrorNodeUniqueId())) {
runningState.setErrorNodeUniqueId(pipeState.getErrorNodeUniqueId());
}
runningState.setNextRunningStateList(nextRunningStateList);
return runningState;
}
注:树形计算结构都可以使用Fork/Join。
这里我们设计节点有如下运行状态:
public enum PipeStateEnum {
/* 不具备执行条件 */
NONE("NONE", "无", false, false),
/* 运行 */
RUNNING("RUNNING", "运行",false, false),
/* 错误 */
ERROR("ERROR", "错误", true, false),
/* 成功 */
SUCCESS("SUCCESS", "成功", true, true),
/* 修剪 */
PRUNING("PRUNING", "修剪", true, true),
;
private String code;
private String msg;
private boolean finished;
private boolean needContinue;
// ... ...
这里有两个核心处理逻辑:
- 当前节点如何判断所推进的支路是否需要执行:当前节点会被所有的上行节点尝试推进执行,因此当前节点每次被触发推进的时候,会收集所有其上行节点的执行结果,只有在所有上行节点执行成功后,并且自身成功抢占了执行权,才可以执行。
关键代码:
public class NodeExecutorImpl implements NodeExecutor {
// ... ...
/** 抢占状态 */
public PipeStateEnum calAndOccupiedNodeState(PipelineNode node, List<String> lastNodeUniqueIds, PipelineContext pc) {
synchronized (node.getUniqueId()) {
if (pc.getNodeState(node.getUniqueId()) != null) {
return PipeStateEnum.NONE;
}
PipeStateEnum nodeState = calCurrentNodeState(lastNodeUniqueIds, pc);
if (nodeState.equals(PipeStateEnum.NONE)) {
return PipeStateEnum.NONE;
}
pc.putNodeState(node.getUniqueId(), nodeState);
return nodeState;
}
}
/* 检查节点 */
/* 有一个为NONE、RUNNING --> NONE : 上游有未执行完的节点,本节点不执行 */
/* 有一个ERROR --> NONE : 上游有执行失败的节点,本节点不再执行 */
/* 全部为PRUNING --> PRUNING : 上游有不满足执行调节的节点,本节点也需要跳过 */
/* 其余 --> RUNNING : 上游有执行成功的节点,本节点需要继续执行 */
private PipeStateEnum calCurrentNodeState(List<String> lastNodeUniqueIds, PipelineContext pc) {
if (CollectionUtils.isEmpty(lastNodeUniqueIds)) {
return PipeStateEnum.RUNNING;
}
boolean none = false;
boolean error = false;
boolean pruning = true;
for (String nodeUniqueId : lastNodeUniqueIds) {
PipeStateEnum nodeState = pc.getNodeState(nodeUniqueId);
if (nodeState == null || nodeState.equals(PipeStateEnum.NONE) || nodeState.equals(PipeStateEnum.RUNNING)) {
none = true;
break;
}
if (nodeState.equals(PipeStateEnum.ERROR)) {
error = true;
break;
}
if (! nodeState.equals(PipeStateEnum.PRUNING)) {
pruning = false;
}
}
return none ? PipeStateEnum.NONE :
error ? PipeStateEnum.NONE :
pruning ? PipeStateEnum.PRUNING :
PipeStateEnum.RUNNING;
}
// ... ...
- 当前节点如何根据自身和下行的执行结果,向上行返回执行结果:在当前节点执行成功的前提下,如果有下行节点执行失败,则以该下行节点作为本节点执行结果向上行节点反馈;如果下游没有执行失败的节点,则以当前节点结果向上行节点反馈。
关键代码:
public class PipeTask {
// ... ...
/* 检查节点 */
/* 有一个ERROR --> ERROR 节点 : 后向有节点报错 */
/* 有一个SUCCESS --> SUCCESS 节点 : 后向有节点成功 */
/* 全部PRUNING --> 当前节点 : 后续节点都未执行*/
/* 其余 --> 当前节点 */
public static PipelineRunningState calNextRunningState(PipelineRunningState currentState, List<PipelineRunningState> nextRunningStateList) {
PipelineRunningState errorState = null;
PipelineRunningState successState = null;
for (PipelineRunningState nextRunningState : nextRunningStateList) {
if (nextRunningState.getPipeState().equals(PipeStateEnum.ERROR)) {
errorState = nextRunningState;
}
if (nextRunningState.getPipeState().equals(PipeStateEnum.SUCCESS)) {
successState = nextRunningState;
}
}
return errorState != null ? errorState :
//successState != null ? successState :
currentState ;
}
整个pipeline的核心逻辑只有三处,一个Fork/Join的实现,两处推进策略
扩展能力:编排组合
Pipeline内部的联系仅依赖线进行组织,而运行时不依赖节点的实际归属Pipeline,因此通过增加线、增加起始点,可以实现串接、并联、替换等组合,可具备类似父子流程嵌套的能力,而不需要额外增加概念设计。下图展示两个Pipeline串接、并联、替换某节点后的有向图形式。
从理论上而言,可以通过设计子Pipeline,然后通过一定规则拼接的形式,从而实现功能扩展。但读者们可以思考以下几个问题:
- 组合场景应用于业务,实用性未必很强;
- 由于节点可以任意组合,因此每个节点的执行动作不能存在冲突,这一点对配置数据和规范的要求比较高;
- 多个Pipeline组合后依然是一个Pipeline,最后只能生成一份结果,对于会生成结果的节点需要额外关注
遗憾:动作应该在节点上还是在连线上?
本次设计上,节点负责所有的能力,而连线仅负责连接。这未必是一个好的设计形式。系统首次上线后,笔者也反复思考,可能实现目标上不会有差别,但是在连线上配置动作执行应该更合适。只是已然上线,只能一条路走到底。望读到此处的朋友们有类似设计需求时可以注意一下。
动作执行
该部分整体并不复杂,调用形式也很简单。
基本流程
每个节点会关联一个执行动作,该执行动作是节点执行的本体。每个动作包含三个部分:1)服务调用,2)执行判断,3)结果渲染。并且这三个部分是可以独立配置在动作里的,也就意味着可以有专门的节点负责服务调用、专门的节点负责结果渲染,也可以合并配置。
执行判断
动作执行时可以针对当前上下文进行判断,判断时会依次进行对规则进行求值,一旦匹配规则便立即退出,并按配置的失败方式返回,如果匹配不到则默认判断为真。因此对于规则配置的要求是需要错误条件要覆盖完整。
在执行判断上,也可以支持全真/全假/单真/单假的多条件配置,在前期使用上可能并不比单表达式直接判断更方便,这里不做描述。
结果合并
如果一个Pipeline中有多个Action产生执行结果,那么最终Pipeline将尝试对结果进行合并。如果是对结果参数进行渲染,那么渲染后的结果会和当前pipeline已有的结果尝试进行合并。对于无法合并的情况,新渲染的值会覆盖已有的值,因此如果没有特别要求,建议结果渲染最外层配置使用结构体配置,防止覆盖丢失。这种合并只支持一级合并,无法对子项合并。在合并结果中,只分为三种类型:
- 基本类型,包括布尔型、整形、字符串等无结构类型;
- 结构体类型,主要带字段的对象或Map类型;
- 列表类型,主要指数组、列表等集合类型。
旧值类型 ↓ | 新值类型 → | 基本类型 | 结构体类型 | 列表类型 |
---|---|---|---|
列表类型 | 新值覆盖 | 新值覆盖 | 两个列表按第一层进行合并 |
基本类型 | 新值覆盖 | 新值覆盖 | 新值覆盖 |
结构体类型 | 新值覆盖 | 两结构体按字段合并,字段同名时,新值覆盖旧值 | 新值覆盖 |
实现代码
public class ActionBizServiceImpl implements ActionBizService {
// ... ...
@Override
public ActionResult doAction(Action action, PipelineContext pc) {
ActionResult actionResult = new ActionResult(action);
actionResult.setStartTime(System.currentTimeMillis());
try {
doAction(action, actionResult, pc);
} catch (BffBizException be) {
actionResult.setEndTime(System.currentTimeMillis());
actionResult.setCode(be.getCode());
actionResult.setMsg(be.getMessage());
} catch (Exception e) {
actionResult.setEndTime(System.currentTimeMillis());
actionResult.setCode(BizErrorEnum.UNKNOWN_EXCEPTION.getCode());
actionResult.setMsg(e.getMessage());
}
return actionResult;
}
@Override
public void doAction(Action action, ActionResult actionResult, PipelineContext pc) {
/* 请求服务 */
ServiceResult serviceResult = invokeService(action, pc);
actionResult.setServiceResult(serviceResult);
pc.saveActionResult(actionResult);
if ((action.getServiceErrorIgnore() == null || !action.getServiceErrorIgnore()) && serviceResult != null) {
if (! serviceResult.isSuccess() || serviceResult.getCode() != 0) {
actionResult.setCode(serviceResult.getCode());
actionResult.setMsg(serviceResult.getMsg());
actionResult.setEndTime(System.currentTimeMillis());
return;
}
}
/* 执行判断 */
boolean judge = judgeStrategy(action, actionResult, pc);
if (! judge) {
actionResult.setEndTime(System.currentTimeMillis());
return;
}
/* 组装结果 */
constructResult(action, pc);
actionResult.setSuccess(true);
actionResult.setCode(BizErrorEnum.SUCCESS.getCode());
actionResult.setMsg(BizErrorEnum.SUCCESS.getMsg());
actionResult.setEndTime(System.currentTimeMillis());
}
private ServiceResult invokeService(Action action, PipelineContext pc) {
if (StringUtils.isEmpty(action.getService())) {
return null;
}
Object params = evalBizService.render(action.getServiceParamField(), pc);
return serviceProviderBizService.invoke(action.getService(), action.getServiceProtocol(), params, pc, action.getServiceCacheObj());
}
private boolean judgeStrategy(Action action, ActionResult actionResult, PipelineContext pc) {
if (CollectionUtils.isEmpty(action.getJudgeStrategyList())) {
return true;
}
for (JudgeStrategy strategy : action.getJudgeStrategyList()) {
boolean judge = evalBizService.judge(strategy.getRule(), pc);
if (judge && strategy.isSuccessFlag()) {
return true;
} else if (judge) {
actionResult.setCode(strategy.getErrorCode());
actionResult.setMsg(strategy.getErrorMsg());
return false;
}
}
return true;
}
private void constructResult(Action action, PipelineContext pc) {
if (action.getResultParamField() != null) {
Object result = evalBizService.render(action.getResultParamField(), pc);
pc.putResult(result);
}
}
}
Action编排形式讨论
Action的三个步骤:服务调用、执行判断、结果渲染,是可以独立生效的,一个Action甚至可以拆成三个Action分别出现在不同的节点上。这种设计除了考虑灵活性外,更多的是对用户行为模式的推测。很有可能用户在配置Pipeline时采用以下固定的配置模式
实际使用中,大家更喜欢在最后一步配置结果渲染。
泛化调用
泛化调用主要用于向上游屏蔽不同服务的调用差异,在网关类系统设计中是比较常见的功能。这里没有过多描述,参考其他网关实现即可。
上层动作执行时,必须要服务调用器传入三种参数:调用的服务接口、调用的服务协议(HTTP/RPC)、调用服务的参数,服务调用器会根据协议,会生成一对对象,一个是请求对象,一个是实际调用器。通过调用器调用请求对象,则拿到调用结果。服务调用器catch所有的调用异常,统一以success和code的形式向动作执行器报告执行结果。
表达式求值
在整个Pipeline运行过程中,需要组织各处参数,计算数据结果,如果全部采用配置模式,功能受限、设计复杂、配置繁琐。这里引用了求值器,根据配置的表达式进行求值。目前使用求值器的地方有两处:
- 谓词判断:用在Pipeline节点的分支条件判断和动作执行判断时的结果断言上。该表达式必须是一个布尔表达式,通过计算上下文参数的真假,来判断条件是否成立;
- 参数渲染:用在动作执行请求服务参数渲染以及结果渲染上,该表达式最终需要生成一个对象,作为运行时的参数。
笔者在实现过程中,没有找到特别满意的求值器,即便是自己设计的也不是很满意。最终笔者采用了工厂模式,提供多个求值器,允许用户自己选择合适的求值器运行脚本,也允许开发者扩展新的求置器:
- QLEXPRESS - 基于QlExpress规则引擎而扩展的求值器,功能范围支持较广,也是默认的求值器;
- VELOCITY - 基于Velocity模板渲染的求值器,可以根据参数变量渲染常规字符串或JSON字符串。该求值器可以实现和QlExpress一样的功能范围,优点在于简单场景配置简单,但对于复杂场景配置过于复杂;
- JSON - 基于Json反序列化的求值器,可以配置对象或JSON字符串。该求值器只能做常量配置;
- FAST - 笔者设计用于实现快速参数赋值的求值器。
后记
本次服务编排在设计上有很多笔者觉得可以改进或提升的地方,后续如果有机会进行调整,会把最新的思路分享给大家。
更多推荐
所有评论(0)