Apache DolphinScheduler 是一个分布式去中心化,易扩展的可视化 DAG 工作流任务调度平台。本篇对 Apache DolphinScheduler 调度平台从源码上解析任务从创建,构建,分发,执行,结束五个阶段的处理,涉及 Api-Server,Master-Server,Worker-Server三个服务。

Apache DolphinScheduler

官网:  https://dolphinscheduler.apache.org/

github: https://github.com/apache/dolphinscheduler

branch:  dev

Api-Server

Api-Server 主要负责工作流和任务的创建,DAG 校验,任务参数校验,定时管理,定时命令生产,项目资源安全告警等的管理功能和提供对外 Rest API。

创建工作流定义

Api 创建工作流定义代码处理流程:

检查各任务节点参数

检查任务依赖关系,生成DAG并判断是否存在有环

保存任务定义

保存工作流定义

保存任务依赖关系

 

public Map<String, Object> createProcessDefinition(User loginUser, long projectCode, String name, String description, String globalParams, String locations, int timeout, String tenantCode, String taskRelationJson, String taskDefinitionJson, ProcessExecutionTypeEnum executionType) {

    ...

    List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);

    Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);

    ...

    ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,

    globalParams, locations, timeout, loginUser.getId(), tenantId);

    processDefinition.setExecutionType(executionType);

    return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);

}

上线/下线工作流定义

上线:更新工作流定义状态为上线

下线:更新工作流定义状态为下线并同时更新scheduler调度状态为下线

 

public Map<String, Object> releaseProcessDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) {

    ...

    switch (releaseState) {

    case ONLINE:

        ...

        processDefinition.setReleaseState(releaseState);

        processDefinitionMapper.updateById(processDefinition);

        break;

    case OFFLINE:

        processDefinition.setReleaseState(releaseState);

        int updateProcess = processDefinitionMapper.updateById(processDefinition);

        ...

        schedulerService.deleteSchedule(project.getId(), schedule.getId());

        ...

        break;

    default:

        putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);

        return result;

    }

    ...

}

手动执行工作流

目前系统支持手动单次和定时两种方式触发工作流执行,手动单次触发执行时,Api 服务生成 Command 可执行数据, 由 Master-Server 触发工作流,分发到 Worker-Server 进行执行。

 

public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {

    ...

    int create = this.createCommand(commandType, processDefinition.getCode(),

taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),

warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams,

expectedParallelismNumber, dryRun, complementDependentMode);

    ...

    return result;

}

private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,

    Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {

    ...

    command.setCommandParam(JSONUtils.toJsonString(cmdParam));

    command.setExecutorId(executorId);

    ...

    // determine whether to complement

    if (commandType == CommandType.COMPLEMENT_DATA) {

        if (start == null || end == null) {

            return 0;

        }

        return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode);

    } else {

        command.setCommandParam(JSONUtils.toJsonString(cmdParam));

        return processService.createCommand(command);

    }

}

创建工作流定时

Api 创建工作流定时代码处理流程:

检查用户权限

检查当前工作流状态是否为上线

新增调度定时数据

更新工作流告警组

 

public Map<String, Object> insertSchedule(User loginUser, long projectCode, long processDefineCode, String schedule, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup, Long environmentCode) {

    ...

    scheduleObj.setCrontab(scheduleParam.getCrontab());

    scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());

    ...

    scheduleMapper.insert(scheduleObj);

    ...

    processDefinition.setWarningGroupId(warningGroupId);

    processDefinitionMapper.updateById(processDefinition);

    ...

    return result;

}

上线工作流定时

Api 上线工作流定时代码处理流程:

检查用户权限

检查当前定时是否为上线状态

检查当前工作流是否存在

检查当前工作流的任务依赖是否存在

检查当前工作流的任务状态是否为上线

检查当前工作流的所有子工作流的状态是否为上线

检查Master-Server是否存在

更新调度定时数据的状态为上线

上线:上线操作则将 Job 添加到 org.quartz.Scheduler, jobName 为 scheduleId, jobGroup 为 projectId, Job 类为 ProcessScheduleJob.

下线:下线操作则将 Job 从 org.quartz.Scheduler 中删除

 

public Map<String, Object> setScheduleState(User loginUser, long projectCode, Integer id, ReleaseState scheduleStatus) {

    ...

    List<Server> masterServers = monitorService.getServerListFromRegistry(true);

    ...

    scheduleObj.setReleaseState(scheduleStatus);

    scheduleMapper.updateById(scheduleObj);

    ...

    switch (scheduleStatus) {

    case ONLINE:

        ...

        setSchedule(project.getId(), scheduleObj);

        break;

    case OFFLINE:

        ...

        deleteSchedule(project.getId(), id);

        break;

    default:

        putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());

        return result;

    }

    ...

    return result;

}

org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob 负责生成 Command 数据。

 

protected void executeInternal(JobExecutionContext context) {

    ...

    Command command = new Command();

    command.setCommandType(CommandType.SCHEDULER);

    ...

    processService.createCommand(command);

}

待续!

期待下篇解读 Master-Server 服务是如何进行 DAG 构建,分发任务并同时监听工作流和任务的状态。

参与开源是一件非常有意义的事情,呼吁大家可以参与进来,感受开源社区的热情🔥🔥🔥 一起摇摆

长按,扫码,关注

及时收看更多精彩内容

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐