前言:服务编排(Orchestration)是在微服务架构中常用于聚合调用的低代码实现方案,通过对下游各个领域系统原子接口的排列组合,最终对客户端或前端提供可灵活适配业务的聚合接口,用以降低业务变化带来的技术成本。本文试图讨论一种有别于传统服务编排的技术方案,通过特别设计的运行规则来实现概念简单、配置灵活的服务编排框架,为后来者在设计上打开一定的思路。

前言

微服务发展到现在,几乎成了有一定技术能力公司内新业务技术架构的首选。通过对业务的水平切割,让各个领域的服务能力独立发展,以增强灵活性和易扩展性。但对业务本身而言,切割并不应该是常态,在诸如营销活动领域,往往会根据运营人员天马行空的思路,贯穿多个领域来共同支撑活动举行。为了快速适配业务,服务编排(Orchestration)往往会被技术者们挂在嘴边,并设想出一个“只需要配置就能将已有能力组合成新能力”的技术愿景。本文不讨论这种愿景是否是一个合适的发展方向,只就服务编排本身而言,确实在快速拉起业务上具备一定的优势。

目前常见的服务编排框架,例如Netflix Conductor、Kstry,甚至拓展到其他低代码编排或编舞引擎,往往会设计极其复杂的概念模型。一旦编排的策略变成重要的业务能力,复杂的配置往往会带来工程化的难题。比如模拟了类似流程图的编辑能力,再稍复杂的配置场景中,配置者就需要理解“开始节点”、“条件网关”、“执行节点”、“结束节点”等诸多概念;如果要模拟并行执行,还会要求做出“入网关节点”(FORK)、“出网关节点”(JOIN)配置策略,在数学上明明只是左右括号的概念而已;有些编排引擎会提供用于扩展的高级功能,例如子流程嵌入,那么父子流程会各自包含独立而又可以相互影响的配置。
常见的服务编排形式
本文在笔者以在厂设计营销活动服务编排经历的基础上,受到了devops工作流(Workflow)的启发,希望能够在服务编排上“去繁就简”,以最小的概念设计去设计服务编排的功能,文章描述了一个较为完整的服务编排框架实现思路。

本文来自笔者在厂时内部说明资料的整理,由于篇幅和精力有限,不可能贴出全部代码并加以解释,并且删去了没有特点的部分,整理上可能有不太流畅的地方,望读者们理解。

名词术语

工作流的编排形式

  • Pipeline - 表示一个经过编排的、可独立请求执行的流程,一个Pipeline中会通过节点连线,将多个节点以串联或并联的形式编排在一起
  • 节点(Node) - Pipeline中某个较为独立的步骤,通常由两个部分组成:1)进入节点的条件(默认true,表示一定会执行节点),2)该节点需要执行的动作
  • 连线(Line) - 表示两个节点先后执行的关系,多个节点通过连线有顺序的编排起来,先执行的节点叫做上行节点(lastNode),后执行的节点叫做下行节点(nextNode)。整个pipeline中,只作为上行节点的节点称为起始节点(startNode),只作为下行节点的节点成为结束节点(endNode)
  • 动作(Action) - 表示某个节点中需要执行的内容,通常包括三个部分组成:1)请求的服务和参数配置,2)执行判断,3)结果渲染
  • 表达式(Exp) - 在编排节点、执行动作时需要进行条件判断或参数求值,在本次实现中使用的是QLExpress表达式
  • 上下文(Context)- 在进行表达式求值时,表达式中的变量始终来源于上下文,而上下文数据的来源则包括用户请求、配置数据、服务请求结果等

流程编排

串行与并行编排

在笔者设计编排逻辑里只有节点和连线的概念,上行节点全部执行成功后,才会启动执行下行节点。没有上行节点的节点都是开始节点,没有下行节点的节点都是结束节点。在编排形式上,只设计了串行(Serial)和并行(Parallel)两种编排模式。
如果数据结构的角度来看,整个pipeline就是有向图结构模式,从实现后的运行效果来看,一个pipeline对于图连接要求不太严格,任意连线方式都可支持,但推荐还是按下图按步骤依次串行或并行配置,执行链路更加清晰,对编排者更加友好。

在这里插入图片描述
如上图,要表达先同时执行步骤11、步骤12,全部成功后再同时执行步骤21、步骤22、步骤23,最后执行步骤31,可按上述进行编排。如果某个节点执行出现问题,那么整个pipeline会提前以报错的形式结束执行,该种模式默认隐含了“如果执行出错或错误断言为真就立即退出”的逻辑。

分支条件的支持

上述描述中没有分支选择逻辑,那么这种情况下,如何支持需要条件执行的场景呢?我们可以把条件执行可以看作并行执行的一般场景,只是默认并行分支执行条件都是true。而条件执行下,只有满足某分支的条件时,并行分支才可以执行,而未满足条件的分支需要跳过执行,也因此节点数据里会包含条件字段。
在这里插入图片描述
如上图,步骤31和步骤32分别实现了互斥的条件,那么执行时Pipeline只会选择步骤31进行执行,而步骤32则会跳过。这里需要注意的是,上行节点的执行条件会影响下行节点的执行。上图中,虽然步骤42也满足条件,但由于其依赖的上行节点步骤32没有执行,也没有其他上行节点可以触发步骤42,因此步骤42无法执行。

这里也可以看出来,条件的执行是具备 与 / 或 逻辑的。在节点上配置的条件并不表示错误断言,只是表示连线上是否可以继续运行。而这个条件不仅可以并行配置,也可以串行配置。简单来说,串行条件表示“”,并行条件表示“”。
在这里插入图片描述
如上图所示,对于步骤11链路,只要步骤21和步骤22有一条条件得到满足,那么步骤31就可以执行;而步骤12链路,如果步骤23和步骤24有一个条件没有满足,那么步骤32无法执行。换言之,下行节点会沿用所有上行节点的条件。

空节点的作用

笔者设计的Pipeline允许出现空节点,即节点没有配置关联动作,在推进时会直接进入空节点的下行节点。这种空节点有时会有额外的作用:

  1. 只带条件的空节点,那么该节点就会变成流程图中的条件节点;
    在这里插入图片描述

  2. 在某些并行步骤的后面通过加入一个空节点,可以汇聚执行链路,待整体执行完成后再继续进行推进;
    在这里插入图片描述3. 一个无作用的占位符,例如在不允许节点跳线,但又需要跳线的场景。在这里插入图片描述

注:空节点可以起到特别的作用可能是笔者设计失败的产物,见后续小结 “遗憾:动作应该在节点上还是在连线上?”

节点的推进策略

设计好Pipeline的运行规则后,那么应当如何推进整个pipline的运行呢?
这里设计的推进策略的核心规则是:

当前节点尽力尝试推进执行下行节点,收集所有下行节点的推进结果,向上行节点反馈自身包括下行节点的推进结果。

在这里插入图片描述

如上图所示,步骤11执行后会推进执行步骤21和步骤22,但步骤21和步骤22在执行后由于彼此并行隔离,因此两个节点都会设法执行步骤31和步骤32,但实际上只希望它们执行一次,因此最终只会有一个节点(例如步骤22)拿到下行节点的执行权,从而推进流程继续执行。

分析来看,由于多节点存在并行执行的情况,因此每个节点执行后,都应当尽可能向后推进执行,但一个节点最多只能执行一次,因此看右图修剪后的pipeline在实际执行的情况应当是一棵由图进行“修剪”而得到的树。下行节点都可能是上行节点的子节点,但每个下行节点只能作为一个上行节点的子节点,因此当一个上行节点尝试推进执行下行节点时,需要判断该支路是否是否被“修剪”。如此来看,每一个节点的下行节点都将独立的形成一棵子树,同时该节点也会作为其上行节点的子树。每个节点执行后都会尝试推进子树的执行,每棵子树都从顶层节点开始执行,由顶层节点决定支路是否执行或被修剪。由此我们得出一个重要的结论:

当前节点会收集每棵子树的执行结果,结合自身执行结果,得到当前节点所在树的执行结果,并由其上行节点收集。那么起始节点所在树的结果就是整个Pipeline执行的结果。

实现方案

本文只粘贴部分关键代码,实现上并不复杂,只看文字描述即可。

具体实现上,由于pipeline的推进具备树型的计算结构,毫无疑问,类似Java的 Fork/Join 的框架是比较适合的实现方式,可以方便地进行并发线程的管理。

public class PipeRecursiveTask extends RecursiveTask<PipelineRunningState> {
    private PipeTask pipeTask;

    public PipeRecursiveTask(PipeTask pipeTask) {
        this.pipeTask = pipeTask;
    }

    @Override
    protected PipelineRunningState compute() {
        return pipeTask.compute();
    }
}

public class PipeTask {
    // ... ...
    protected PipelineRunningState compute() {
        // 开始节点
        PipelineRunningState runningState;
        List<String> nextUniqueIds;

        /* 执行节点,得到执行状态 */
        if (StringUtils.isEmpty(nodeUniqueId)) {
            runningState = new PipelineRunningState(pipelineContext.getPipelineUniqueId(),"");
            runningState.setExecuteState(PipeStateEnum.SUCCESS);
            runningState.setExecuted(false);

            nextUniqueIds = pipelineContext.getPipeline().getStartNodeUniqueIdList();
        } else {
            runningState = nodeExecutor.apply(this);
            nextUniqueIds = pipelineContext.getPipeline().getNextUniqueIdsMap().get(runningState.getNodeUniqueId());
        }

        runningState.setPipeState(runningState.getExecuteState());
        if (runningState.getExecuteState().equals(PipeStateEnum.ERROR)) {
            runningState.setErrorNodeUniqueId(nodeUniqueId);
        }

        /* 获取后向节点,如果没有则直接返回当前状态 */
        if (CollectionUtils.isEmpty(nextUniqueIds)) {
            runningState.setPipe("arriveEnd");
            return runningState;
        }
        /* 节点不需要推进,则直接返回当前状态 */
        if (! runningState.getExecuteState().isNeedContinue()) {
            runningState.setPipe("discontinue");
            return runningState;
        }
        /* 并行执行所有后向节点,透出推进状态 */
        long nextStartTime = System.currentTimeMillis();
        List<PipelineRunningState> nextRunningStateList = Lists.newArrayList();
        if (nextUniqueIds.size() == 1) {
            PipelineRunningState nextRunningState = new PipeTask(nextUniqueIds.get(0), nextStartTime, pipelineContext, nodeExecutor).compute();
            nextRunningStateList.add(nextRunningState);
        } else {
            Map<String, PipeRecursiveTask> recursiveTaskMap = Maps.newHashMap();
            for (String nodeUniqueId : nextUniqueIds) {
                PipeTask nodeTask = new PipeTask(nodeUniqueId, nextStartTime, pipelineContext, nodeExecutor);
                PipeRecursiveTask recursiveTask = new PipeRecursiveTask(nodeTask);
                recursiveTaskMap.put(nodeUniqueId, recursiveTask);
                recursiveTask.fork();
            }
            recursiveTaskMap.forEach((currentNodeUniqueId, recursiveTask) -> nextRunningStateList.add(recursiveTask.join()));
        }
        PipelineRunningState pipeState = calNextRunningState(runningState, nextRunningStateList);
        runningState.setPipe("nextForward");
        runningState.setPipeState(pipeState.getPipeState());
        if (StringUtils.isNotEmpty(pipeState.getErrorNodeUniqueId())) {
            runningState.setErrorNodeUniqueId(pipeState.getErrorNodeUniqueId());
        }
        runningState.setNextRunningStateList(nextRunningStateList);
        return runningState;
    }

注:树形计算结构都可以使用Fork/Join。

这里我们设计节点有如下运行状态:

public enum PipeStateEnum {
    /* 不具备执行条件 */
    NONE("NONE", "无", false, false),
    /* 运行 */
    RUNNING("RUNNING", "运行",false, false),
    /* 错误 */
    ERROR("ERROR", "错误", true, false),
    /* 成功 */
    SUCCESS("SUCCESS", "成功", true, true),
    /* 修剪 */
    PRUNING("PRUNING", "修剪", true, true),
    ;

    private String code;
    private String msg;
    private boolean finished;
    private boolean needContinue;
    // ... ...

在这里插入图片描述
这里有两个核心处理逻辑

  1. 当前节点如何判断所推进的支路是否需要执行:当前节点会被所有的上行节点尝试推进执行,因此当前节点每次被触发推进的时候,会收集所有其上行节点的执行结果,只有在所有上行节点执行成功后,并且自身成功抢占了执行权,才可以执行。

关键代码:

public class NodeExecutorImpl implements NodeExecutor {
    // ... ...
    /** 抢占状态 */
    public PipeStateEnum calAndOccupiedNodeState(PipelineNode node, List<String> lastNodeUniqueIds, PipelineContext pc) {
        synchronized (node.getUniqueId()) {
            if (pc.getNodeState(node.getUniqueId()) != null) {
                return PipeStateEnum.NONE;
            }
            PipeStateEnum nodeState = calCurrentNodeState(lastNodeUniqueIds, pc);
            if (nodeState.equals(PipeStateEnum.NONE)) {
                return PipeStateEnum.NONE;
            }
            pc.putNodeState(node.getUniqueId(), nodeState);
            return nodeState;
        }
    }
    /* 检查节点 */
    /* 有一个为NONE、RUNNING --> NONE : 上游有未执行完的节点,本节点不执行 */
    /* 有一个ERROR --> NONE : 上游有执行失败的节点,本节点不再执行 */
    /* 全部为PRUNING --> PRUNING : 上游有不满足执行调节的节点,本节点也需要跳过 */
    /* 其余 --> RUNNING : 上游有执行成功的节点,本节点需要继续执行 */
    private PipeStateEnum calCurrentNodeState(List<String> lastNodeUniqueIds, PipelineContext pc) {
        if (CollectionUtils.isEmpty(lastNodeUniqueIds)) {
            return PipeStateEnum.RUNNING;
        }

        boolean none = false;
        boolean error = false;
        boolean pruning = true;

        for (String nodeUniqueId : lastNodeUniqueIds) {
            PipeStateEnum nodeState = pc.getNodeState(nodeUniqueId);
            if (nodeState == null || nodeState.equals(PipeStateEnum.NONE) || nodeState.equals(PipeStateEnum.RUNNING)) {
                none = true;
                break;
            }
            if (nodeState.equals(PipeStateEnum.ERROR)) {
                error = true;
                break;
            }
            if (! nodeState.equals(PipeStateEnum.PRUNING)) {
                pruning = false;
            }
        }

        return none    ? PipeStateEnum.NONE :
               error   ? PipeStateEnum.NONE :
               pruning ? PipeStateEnum.PRUNING :
                         PipeStateEnum.RUNNING;
    }
    // ... ...
  1. 当前节点如何根据自身和下行的执行结果,向上行返回执行结果:在当前节点执行成功的前提下,如果有下行节点执行失败,则以该下行节点作为本节点执行结果向上行节点反馈;如果下游没有执行失败的节点,则以当前节点结果向上行节点反馈。
    关键代码:
public class PipeTask {
    // ... ...
    /* 检查节点 */
    /* 有一个ERROR --> ERROR 节点 : 后向有节点报错 */
    /* 有一个SUCCESS --> SUCCESS 节点 : 后向有节点成功 */
    /* 全部PRUNING --> 当前节点 : 后续节点都未执行*/
    /* 其余 --> 当前节点 */
    public static PipelineRunningState calNextRunningState(PipelineRunningState currentState, List<PipelineRunningState> nextRunningStateList) {
        PipelineRunningState errorState = null;
        PipelineRunningState successState = null;

        for (PipelineRunningState nextRunningState : nextRunningStateList) {
            if (nextRunningState.getPipeState().equals(PipeStateEnum.ERROR)) {
                errorState = nextRunningState;
            }
            if (nextRunningState.getPipeState().equals(PipeStateEnum.SUCCESS)) {
                successState = nextRunningState;
            }
        }

        return errorState != null   ? errorState   :
               //successState != null ? successState :
                                      currentState ;
    }

整个pipeline的核心逻辑只有三处,一个Fork/Join的实现,两处推进策略

扩展能力:编排组合

Pipeline内部的联系仅依赖线进行组织,而运行时不依赖节点的实际归属Pipeline,因此通过增加线、增加起始点,可以实现串接、并联、替换等组合,可具备类似父子流程嵌套的能力,而不需要额外增加概念设计。下图展示两个Pipeline串接、并联、替换某节点后的有向图形式。
在这里插入图片描述
从理论上而言,可以通过设计子Pipeline,然后通过一定规则拼接的形式,从而实现功能扩展。但读者们可以思考以下几个问题:

  • 组合场景应用于业务,实用性未必很强;
  • 由于节点可以任意组合,因此每个节点的执行动作不能存在冲突,这一点对配置数据和规范的要求比较高;
  • 多个Pipeline组合后依然是一个Pipeline,最后只能生成一份结果,对于会生成结果的节点需要额外关注

遗憾:动作应该在节点上还是在连线上?

本次设计上,节点负责所有的能力,而连线仅负责连接。这未必是一个好的设计形式。系统首次上线后,笔者也反复思考,可能实现目标上不会有差别,但是在连线上配置动作执行应该更合适。只是已然上线,只能一条路走到底。望读到此处的朋友们有类似设计需求时可以注意一下。
请添加图片描述

动作执行

该部分整体并不复杂,调用形式也很简单。

基本流程

每个节点会关联一个执行动作,该执行动作是节点执行的本体。每个动作包含三个部分:1)服务调用,2)执行判断,3)结果渲染。并且这三个部分是可以独立配置在动作里的,也就意味着可以有专门的节点负责服务调用、专门的节点负责结果渲染,也可以合并配置。
在这里插入图片描述

执行判断

动作执行时可以针对当前上下文进行判断,判断时会依次进行对规则进行求值,一旦匹配规则便立即退出,并按配置的失败方式返回,如果匹配不到则默认判断为真。因此对于规则配置的要求是需要错误条件要覆盖完整。

在这里插入图片描述
在执行判断上,也可以支持全真/全假/单真/单假的多条件配置,在前期使用上可能并不比单表达式直接判断更方便,这里不做描述。

结果合并

如果一个Pipeline中有多个Action产生执行结果,那么最终Pipeline将尝试对结果进行合并。如果是对结果参数进行渲染,那么渲染后的结果会和当前pipeline已有的结果尝试进行合并。对于无法合并的情况,新渲染的值会覆盖已有的值,因此如果没有特别要求,建议结果渲染最外层配置使用结构体配置,防止覆盖丢失。这种合并只支持一级合并,无法对子项合并。在合并结果中,只分为三种类型:

  • 基本类型,包括布尔型、整形、字符串等无结构类型;
  • 结构体类型,主要带字段的对象或Map类型;
  • 列表类型,主要指数组、列表等集合类型。
旧值类型 ↓ | 新值类型 →基本类型结构体类型列表类型
列表类型新值覆盖新值覆盖两个列表按第一层进行合并
基本类型新值覆盖新值覆盖新值覆盖
结构体类型新值覆盖两结构体按字段合并,字段同名时,新值覆盖旧值新值覆盖

实现代码

public class ActionBizServiceImpl implements ActionBizService {
    // ... ...
    @Override
    public ActionResult doAction(Action action, PipelineContext pc) {
        ActionResult actionResult = new ActionResult(action);
        actionResult.setStartTime(System.currentTimeMillis());
        try {
            doAction(action, actionResult, pc);
        } catch (BffBizException be) {
            actionResult.setEndTime(System.currentTimeMillis());
            actionResult.setCode(be.getCode());
            actionResult.setMsg(be.getMessage());
        } catch (Exception e) {
            actionResult.setEndTime(System.currentTimeMillis());
            actionResult.setCode(BizErrorEnum.UNKNOWN_EXCEPTION.getCode());
            actionResult.setMsg(e.getMessage());
        }
        return actionResult;
    }

    @Override
    public void doAction(Action action, ActionResult actionResult, PipelineContext pc) {
        /* 请求服务 */
        ServiceResult serviceResult = invokeService(action, pc);
        actionResult.setServiceResult(serviceResult);
        pc.saveActionResult(actionResult);
        if ((action.getServiceErrorIgnore() == null || !action.getServiceErrorIgnore()) && serviceResult != null) {
            if (! serviceResult.isSuccess() || serviceResult.getCode() != 0) {
                actionResult.setCode(serviceResult.getCode());
                actionResult.setMsg(serviceResult.getMsg());
                actionResult.setEndTime(System.currentTimeMillis());
                return;
            }
        }

        /* 执行判断 */
        boolean judge = judgeStrategy(action, actionResult, pc);
        if (! judge) {
            actionResult.setEndTime(System.currentTimeMillis());
            return;
        }

        /* 组装结果 */
        constructResult(action, pc);

        actionResult.setSuccess(true);
        actionResult.setCode(BizErrorEnum.SUCCESS.getCode());
        actionResult.setMsg(BizErrorEnum.SUCCESS.getMsg());
        actionResult.setEndTime(System.currentTimeMillis());
    }

    private ServiceResult invokeService(Action action, PipelineContext pc) {
        if (StringUtils.isEmpty(action.getService())) {
            return null;
        }
        Object params = evalBizService.render(action.getServiceParamField(), pc);
        return serviceProviderBizService.invoke(action.getService(), action.getServiceProtocol(), params, pc, action.getServiceCacheObj());
    }

    private boolean judgeStrategy(Action action, ActionResult actionResult, PipelineContext pc) {
        if (CollectionUtils.isEmpty(action.getJudgeStrategyList())) {
            return true;
        }
        for (JudgeStrategy strategy : action.getJudgeStrategyList()) {
            boolean judge = evalBizService.judge(strategy.getRule(), pc);
            if (judge && strategy.isSuccessFlag()) {
                return true;
            } else if (judge) {
                actionResult.setCode(strategy.getErrorCode());
                actionResult.setMsg(strategy.getErrorMsg());
                return false;
            }
        }
        return true;
    }

    private void constructResult(Action action, PipelineContext pc) {
        if (action.getResultParamField() != null) {
            Object result = evalBizService.render(action.getResultParamField(), pc);
            pc.putResult(result);
        }
    }
}

Action编排形式讨论

Action的三个步骤:服务调用、执行判断、结果渲染,是可以独立生效的,一个Action甚至可以拆成三个Action分别出现在不同的节点上。这种设计除了考虑灵活性外,更多的是对用户行为模式的推测。很有可能用户在配置Pipeline时采用以下固定的配置模式

在这里插入图片描述

实际使用中,大家更喜欢在最后一步配置结果渲染。

泛化调用

泛化调用主要用于向上游屏蔽不同服务的调用差异,在网关类系统设计中是比较常见的功能。这里没有过多描述,参考其他网关实现即可。

上层动作执行时,必须要服务调用器传入三种参数:调用的服务接口、调用的服务协议(HTTP/RPC)、调用服务的参数,服务调用器会根据协议,会生成一对对象,一个是请求对象,一个是实际调用器。通过调用器调用请求对象,则拿到调用结果。服务调用器catch所有的调用异常,统一以success和code的形式向动作执行器报告执行结果。

在这里插入图片描述

表达式求值

在整个Pipeline运行过程中,需要组织各处参数,计算数据结果,如果全部采用配置模式,功能受限、设计复杂、配置繁琐。这里引用了求值器,根据配置的表达式进行求值。目前使用求值器的地方有两处:

  • 谓词判断:用在Pipeline节点的分支条件判断和动作执行判断时的结果断言上。该表达式必须是一个布尔表达式,通过计算上下文参数的真假,来判断条件是否成立;
  • 参数渲染:用在动作执行请求服务参数渲染以及结果渲染上,该表达式最终需要生成一个对象,作为运行时的参数。

笔者在实现过程中,没有找到特别满意的求值器,即便是自己设计的也不是很满意。最终笔者采用了工厂模式,提供多个求值器,允许用户自己选择合适的求值器运行脚本,也允许开发者扩展新的求置器:

  • QLEXPRESS - 基于QlExpress规则引擎而扩展的求值器,功能范围支持较广,也是默认的求值器;
  • VELOCITY - 基于Velocity模板渲染的求值器,可以根据参数变量渲染常规字符串或JSON字符串。该求值器可以实现和QlExpress一样的功能范围,优点在于简单场景配置简单,但对于复杂场景配置过于复杂;
  • JSON - 基于Json反序列化的求值器,可以配置对象或JSON字符串。该求值器只能做常量配置;
  • FAST - 笔者设计用于实现快速参数赋值的求值器。

后记

本次服务编排在设计上有很多笔者觉得可以改进或提升的地方,后续如果有机会进行调整,会把最新的思路分享给大家。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐