工作流系统(Workflow System)是 Dify 的核心组件,它支持通过可视化编程界面创建复杂的 AI 应用程序。它允许用户通过将不同的功能块连接在一起来设计工作流,以处理数据、与 AI 模型交互、管理条件并执行各种操作。

接下来将详细介绍 Dify 工作流的实现机制,通过分析代码实现、数据流动和执行过程,充分理解工作流的实现原理。

一、工作流系统概述

1.1 核心概念

Dify 工作流系统是一个基于图(Graph)的执行引擎,允许用户通过可视化界面设计和执行复杂的 AI 工作流。工作流由多种类型的节点(Node)组成,这些节点通过边(Edge)连接,形成有向图结构。

1.2 系统架构

工作流系统主要由以下几个部分组成:

  • 图引擎:负责解析工作流配置,构建执行图,并控制节点的执行顺序
  • 节点实现:各种类型节点的具体实现,如 LLM、知识检索、条件分支等
  • 变量管理:管理工作流执行过程中的变量传递和存储
  • 执行记录:记录工作流和节点的执行状态、输入输出和性能指标

二、数据模型设计

2.1 工作流数据模型

Dify 使用多个模型来表示工作流及其执行状态:

  • WorkflowModel:工作流的基本信息,包括 ID、名称、描述、配置等
  • WorkflowRunModel:工作流的执行记录,包括执行状态、开始时间、结束时间等
  • WorkflowNodeExecutionModel:节点的执行记录,包括节点类型、输入、输出、状态等
  • ConversationVariable:存储会话变量,包括名称、值类型、值等
  • WorkflowDraftVariable:存储草稿工作流中的变量,包括会话变量、系统变量和节点变量
2.2 工作流节点类型

Dify 工作流支持多种类型的节点,每种节点有不同的功能和配置:

Dify 支持多种类型的节点,包括:

  • START:工作流的起始节点
  • END:工作流的结束节点
  • LLM:大语言模型节点,用于生成文本
  • KNOWLEDGE_RETRIEVAL:知识检索节点,用于从知识库中检索信息
  • IF_ELSE:条件分支节点,根据条件选择执行路径
  • CODE:代码执行节点,执行自定义代码
  • HTTP_REQUEST:HTTP 请求节点,与外部 API 交互
  • TOOL:工具节点,调用预定义的工具
  • AGENT:代理节点,执行复杂的任务

三、工作流执行机制

3.1 工作流执行流程

  1. 初始化工作流运行记录
  2. 解析工作流配置,构建执行图
  3. 从起始节点开始执行
  4. 根据图的边定义,确定下一个要执行的节点
  5. 执行节点,记录执行结果
  6. 重复步骤 4-5,直到达到结束节点或出现错误
  7. 完成工作流执行,更新运行记录
3.2 图引擎执行机制

图引擎是工作流执行的核心,负责:

  • 解析节点和边配置
  • 构建边映射和反向边映射
  • 识别根节点和叶子节点
  • 检查节点连接性和循环
  • 管理并行执行
  • 控制执行流程

四、变量管理机制

4.1 变量池设计

Dify 工作流使用变量池(VariablePool)管理工作流执行过程中的变量。变量池包含以下几类变量:

  1. 系统变量:以 sys. 为前缀,如 sys.query(用户输入)、sys.files(用户上传文件)
  2. 环境变量:工作流级别的配置变量
  3. 会话变量:在会话中持久化的变量
  4. 节点变量:各节点的输入输出变量

4.2 变量传递机制

节点之间通过变量池传递数据。每个节点执行时:

  1. 节点执行后,将输出添加到变量池中
  2. 下一个节点从变量池中获取所需的输入变量
  3. 支持通过选择器和模板字符串引用变量
  4. 支持文件类型变量的传递

变量的引用使用 {{#node_id.variable_name#}} 的模板语法。

五、节点实现机制

5.1 基础节点结构

所有节点都继承自 BaseNode 抽象类,实现自己的 _run 方法:

所有节点都继承自 BaseNode 类,实现以下方法:

  • _run:节点的具体执行逻辑
  • _get_inputs:获取节点的输入变量
  • _get_outputs:处理节点的输出变量

以下是 BaseNode 类的核心实现:

class BaseNode(Generic[GenericNodeData]):
    _node_data_cls: type[GenericNodeData]
    _node_type: NodeType

    def __init__(
        self,
        id: str,
        config: Mapping[str, Any],
        graph_init_params: "GraphInitParams",
        graph: "Graph",
        graph_runtime_state: "GraphRuntimeState",
        previous_node_id: Optional[str] = None,
        thread_pool_id: Optional[str] = None,
    ) -> None:
        self.id = id
        self.tenant_id = graph_init_params.tenant_id
        self.app_id = graph_init_params.app_id
        self.workflow_type = graph_init_params.workflow_type
        self.workflow_id = graph_init_params.workflow_id
        self.graph_config = graph_init_params.graph_config
        self.user_id = graph_init_params.user_id
        self.user_from = graph_init_params.user_from
        self.invoke_from = graph_init_params.invoke_from
        self.workflow_call_depth = graph_init_params.call_depth
        self.graph = graph
        self.graph_runtime_state = graph_runtime_state
        self.previous_node_id = previous_node_id
        self.thread_pool_id = thread_pool_id

        node_id = config.get("id")
        ifnot node_id:
            raise ValueError("Node ID is required.")

        self.node_id = node_id

        node_data = self._node_data_cls.model_validate(config.get("data", {}))
        self.node_data = node_data

    @abstractmethod
    def _run(self) -> NodeRunResult | Generator[Union[NodeEvent, "InNodeEvent"], None, None]:
        """
        Run node
        :return:
        """
        raise NotImplementedError

    def run(self) -> Generator[Union[NodeEvent, "InNodeEvent"], None, None]:
        try:
            result = self._run()
        except Exception as e:
            logger.exception(f"Node {self.node_id} failed to run")
            result = NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                error=str(e),
                error_type="WorkflowNodeError",
            )

        if isinstance(result, NodeRunResult):
            yield RunCompletedEvent(run_result=result)
        else:
            yieldfrom result

BaseNode 类是所有节点的基类,它定义了节点的基本属性和方法:

  1. 初始化方法:接收节点 ID、配置、图引擎参数等,初始化节点的基本属性
  2. 抽象方法 _run:子类必须实现的方法,包含节点的具体执行逻辑
  3. run 方法:调用 _run 方法并处理异常,将结果包装为事件返回
5.2 节点类型实现
5.2.1 StartNode 实现

StartNode 是工作流的起始节点,负责将用户输入和系统变量作为节点的输出:

class StartNode(BaseNode[StartNodeData]):
    _node_data_cls = StartNodeData
    _node_type = NodeType.START

    def _run(self) -> NodeRunResult:
        node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
        system_inputs = self.graph_runtime_state.variable_pool.system_variables

        # TODO: System variables should be directly accessible, no need for special handling
        # Set system variables as node outputs.
        for var in system_inputs:
            node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var]

        return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=node_inputs, outputs=node_inputs)

StartNode 的实现非常简单,它主要完成以下工作:

  1. 从变量池中获取用户输入和系统变量
  2. 将系统变量添加到节点输入中,以 SYSTEM_VARIABLE_NODE_ID.var 的形式作为键
  3. 返回包含这些输入和输出的 NodeRunResult,状态为成功
5.2.2 IfElseNode 实现

IfElseNode 是条件分支节点,根据条件选择执行路径:

class IfElseNode(BaseNode[IfElseNodeData]):
    _node_data_cls = IfElseNodeData
    _node_type = NodeType.IF_ELSE

    def _run(self) -> NodeRunResult:
        """
        Run node
        :return:
        """
        node_inputs: dict[str, list] = {"conditions": []}

        process_data: dict[str, list] = {"condition_results": []}

        input_conditions = []
        final_result = False
        selected_case_id = None
        condition_processor = ConditionProcessor()
        try:
            # Check if the new cases structure is used
            if self.node_data.cases:
                for case in self.node_data.cases:
                    input_conditions, group_result, final_result = condition_processor.process_conditions(
                        variable_pool=self.graph_runtime_state.variable_pool,
                        conditions=case.conditions,
                        operator=case.logical_operator,
                    )

                    process_data["condition_results"].append(
                        {
                            "group": case.model_dump(),
                            "results": group_result,
                            "final_result": final_result,
                        }
                    )

                    # Break if a case passes (logical short-circuit)
                    if final_result:
                        selected_case_id = case.case_id  # Capture the ID of the passing case
                        break

            else:
                # Fallback to old structure if cases are not defined
                input_conditions, group_result, final_result = _should_not_use_old_function(
                    condition_processor=condition_processor,
                    variable_pool=self.graph_runtime_state.variable_pool,
                    conditions=self.node_data.conditions or [],
                    operator=self.node_data.logical_operator or"and",
                )

                selected_case_id = "true"if final_result else"false"

                process_data["condition_results"].append(
                    {"group": "default", "results": group_result, "final_result": final_result}
                )

            node_inputs["conditions"] = input_conditions

        except Exception as e:
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED, inputs=node_inputs, process_data=process_data, error=str(e)
            )

        outputs = {"result": final_result, "selected_case_id": selected_case_id}

        data = NodeRunResult(
            status=WorkflowNodeExecutionStatus.SUCCEEDED,
            inputs=node_inputs,
            process_data=process_data,
            edge_source_handle=selected_case_id or"false",  # Use case ID or 'default'
            outputs=outputs,
        )

        return data

IfElseNode 的实现主要完成以下工作:

  1. 使用 ConditionProcessor 处理条件逻辑
  2. 遍历 cases 结构中的条件组,并根据结果确定 selected_case_id
  3. 如果使用旧的结构,则调用 _should_not_use_old_function 进行兼容处理
  4. 返回包含条件结果的 NodeRunResult,并设置 edge_source_handle 以指示下一个要执行的节点
5.2.3 LLM 节点实现

LLM 节点是工作流中最核心的节点之一,它负责调用大语言模型生成文本。LLM 节点的执行流程:

以下是 LLMNode 类的部分实现:

class LLMNode(BaseNode[LLMNodeData]):
    _node_data_cls = LLMNodeData
    _node_type = NodeType.LLM

    # Instance attributes specific to LLMNode.
    # Output variable for file
    _file_outputs: list["File"]
    _llm_file_saver: LLMFileSaver

    def __init__(
        self,
        id: str,
        config: Mapping[str, Any],
        graph_init_params: "GraphInitParams",
        graph: "Graph",
        graph_runtime_state: "GraphRuntimeState",
        previous_node_id: Optional[str] = None,
        thread_pool_id: Optional[str] = None,
        *,
        llm_file_saver: LLMFileSaver | None = None,
    ) -> None:
        super().__init__(
            id=id,
            config=config,
            graph_init_params=graph_init_params,
            graph=graph,
            graph_runtime_state=graph_runtime_state,
            previous_node_id=previous_node_id,
            thread_pool_id=thread_pool_id,
        )
        # LLM file outputs, used for MultiModal outputs.
        self._file_outputs: list[File] = []

        if llm_file_saver isNone:
            llm_file_saver = FileSaverImpl(
                user_id=graph_init_params.user_id,
                tenant_id=graph_init_params.tenant_id,
            )
        self._llm_file_saver = llm_file_saver

    def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
        def process_structured_output(text: str) -> Optional[dict[str, Any]]:
            """Process structured output if enabled"""
            ifnot self.node_data.structured_output_enabled ornot self.node_data.structured_output:
                returnNone
            return self._parse_structured_output(text)

        node_inputs: Optional[dict[str, Any]] = None
        process_data = None
        result_text = ""
        usage = LLMUsage.empty_usage()
        finish_reason = None

        try:
            # init messages template
            self.node_data.prompt_template = self._transform_chat_messages(self.node_data.prompt_template)

            # fetch variables and fetch values from variable pool
            inputs = self._fetch_inputs(node_data=self.node_data)

            # fetch jinja2 inputs
            jinja_inputs = self._fetch_jinja_inputs(node_data=self.node_data)

            # merge inputs
            inputs.update(jinja_inputs)

            node_inputs = {}

            # fetch files
            files = (
                self._fetch_files(selector=self.node_data.vision.configs.variable_selector)
                if self.node_data.vision.enabled
                else []
            )

            if files:
                node_inputs["#files#"] = [file.to_dict() for file in files]

            # fetch context value
            generator = self._fetch_context(node_data=self.node_data)
            context = None
            for event in generator:
                if isinstance(event, RunRetrieverResourceEvent):
                    context = event.context
                    yield event
            if context:
                node_inputs["#context#"] = context

            # fetch model config
            model_instance, model_config = self._fetch_model_config(self.node_data.model)
            
            # ... 更多代码 ...

LLMNode 是一个典型的节点实现,负责调用大语言模型:

  1. 初始化节点参数和模型配置
  2. 处理输入变量和文件
  3. 构建提示消息
  4. 调用 LLM 模型
  5. 处理模型返回的结果
  6. 生成节点执行结果
5.2.4 ToolNode 实现

ToolNode 是工具节点,负责调用预定义的工具:

class ToolNode(BaseNode[ToolNodeData]):
    """
    Tool Node
    """

    _node_data_cls = ToolNodeData
    _node_type = NodeType.TOOL

    def _run(self) -> Generator:
        """
        Run the tool node
        """

        node_data = cast(ToolNodeData, self.node_data)

        # fetch tool icon
        tool_info = {
            "provider_type": node_data.provider_type.value,
            "provider_id": node_data.provider_id,
            "plugin_unique_identifier": node_data.plugin_unique_identifier,
        }

        # get tool runtime
        try:
            from core.tools.tool_manager import ToolManager

            tool_runtime = ToolManager.get_workflow_tool_runtime(
                self.tenant_id, self.app_id, self.node_id, self.node_data, self.invoke_from
            )
        except ToolNodeError as e:
            yield RunCompletedEvent(
                run_result=NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs={},
                    metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info},
                    error=f"Failed to get tool runtime: {str(e)}",
                    error_type=type(e).__name__,
                )
            )
            return

        # get parameters
        tool_parameters = tool_runtime.get_merged_runtime_parameters() or []
        parameters = self._generate_parameters(
            tool_parameters=tool_parameters,
            variable_pool=self.graph_runtime_state.variable_pool,
            node_data=self.node_data,
        )
        
        # get conversation id
        conversation_id = self.graph_runtime_state.variable_pool.get_system_variable("conversation_id")

        # invoke tool
        try:
            from core.tools.entities.tool_entities import ToolInvokeMessage
            from core.tools.tool_engine import ToolEngine

            # invoke tool
            tool_invoke_message = ToolInvokeMessage(
                conversation_id=conversation_id,
                tool_parameters=parameters,
            )

            # invoke tool
            tool_response = ToolEngine.generic_invoke(
                tenant_id=self.tenant_id,
                app_id=self.app_id,
                tool_runtime=tool_runtime,
                tool_invoke_message=tool_invoke_message,
                user_id=self.user_id,
                invoke_from=self.invoke_from,
            )

            # ... 处理工具响应 ...
        except Exception as e:
            # ... 处理异常 ...

ToolNode 的实现主要完成以下工作:

  1. 获取工具信息和工具运行时
  2. 生成工具参数
  3. 获取会话 ID
  4. 通过 ToolEngine.generic_invoke 调用工具
  5. 处理工具返回的结果
  6. 生成节点执行结果
5.2.5 KnowledgeRetrievalNode 实现

KnowledgeRetrievalNode 是知识检索节点,负责从知识库中检索相关信息:

class KnowledgeRetrievalNode(LLMNode):
    """
    Knowledge Retrieval Node
    """

    _node_data_cls = KnowledgeRetrievalNodeData
    _node_type = NodeType.KNOWLEDGE_RETRIEVAL

    def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
        """
        Run node
        """
        node_data = cast(KnowledgeRetrievalNodeData, self.node_data)

        # get query variable
        query_variable = node_data.query_variable
        ifnot query_variable:
            yield RunCompletedEvent(
                run_result=NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs={},
                    error="Query variable is not set",
                )
            )
            return

        # get query from variable pool
        query = self.graph_runtime_state.variable_pool.get_variable(query_variable)
        ifnot query:
            yield RunCompletedEvent(
                run_result=NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs={},
                    error=f"Query variable {query_variable} is empty",
                )
            )
            return

        # check rate limit
        ifnot self._check_rate_limit():
            yield RunCompletedEvent(
                run_result=NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs={},
                    error="Rate limit exceeded",
                )
            )
            return

        # get retrieval model config
        retrieval_model_config = {
            "search_method": node_data.search_method,
            "reranking_enable": node_data.reranking_enable,
            "reranking_model": node_data.reranking_model,
            "top_k": node_data.top_k,
            "score_threshold": node_data.score_threshold,
        }

        # ... 执行知识检索逻辑 ...

KnowledgeRetrievalNode 的实现主要完成以下工作:

  1. 从变量池中提取查询变量
  2. 检查查询是否为空
  3. 进行速率限制检查
  4. 定义检索模型配置
  5. 执行知识检索
  6. 处理检索结果
  7. 生成节点执行结果
5.2.6 CodeNode 实现

CodeNode 是代码执行节点,负责执行用户定义的代码:

class CodeNode(BaseNode[CodeNodeData]):
    """
    Code Node
    """

    _node_data_cls = CodeNodeData
    _node_type = NodeType.CODE

    def _run(self) -> NodeRunResult:
        """
        Run node
        """
        node_data = cast(CodeNodeData, self.node_data)

        # get code language and content
        code_language = node_data.code_language
        code_content = node_data.code_content

        # get input variables
        input_variables = {}
        for input_variable in node_data.input_variables:
            variable_name = input_variable.variable_name
            variable_value = self.graph_runtime_state.variable_pool.get_variable(input_variable.variable_selector)
            input_variables[variable_name] = variable_value

        # execute code
        try:
            from core.workflow.nodes.code.code_executor import CodeExecutor

            result = CodeExecutor.execute_workflow_code_template(
                code_language=code_language,
                code_content=code_content,
                input_variables=input_variables,
            )

            # check output variables
            outputs = {}
            for output_variable in node_data.output_variables:
                variable_name = output_variable.variable_name
                if variable_name notin result:
                    return NodeRunResult(
                        status=WorkflowNodeExecutionStatus.FAILED,
                        inputs=input_variables,
                        error=f"Output variable {variable_name} not found in code execution result",
                    )

                variable_value = result[variable_name]
                variable_type = output_variable.variable_type

                # check variable type
                if variable_type == "string":
                    ifnot self._check_string(variable_value, output_variable.max_length):
                        return NodeRunResult(
                            status=WorkflowNodeExecutionStatus.FAILED,
                            inputs=input_variables,
                            error=f"Output variable {variable_name} is not a valid string or exceeds max length",
                        )
                elif variable_type == "number":
                    ifnot self._check_number(variable_value):
                        return NodeRunResult(
                            status=WorkflowNodeExecutionStatus.FAILED,
                            inputs=input_variables,
                            error=f"Output variable {variable_name} is not a valid number",
                        )
                # ... 其他类型检查 ...

                outputs[variable_name] = variable_value

            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.SUCCEEDED,
                inputs=input_variables,
                outputs=outputs,
            )

        except (CodeExecutionError, CodeNodeError) as e:
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                inputs=input_variables,
                error=str(e),
            )

CodeNode 的实现主要完成以下工作:

  1. 获取代码语言和代码内容
  2. 从变量池中获取输入变量
  3. 通过 CodeExecutor.execute_workflow_code_template 执行代码
  4. 检查输出变量的类型和长度
  5. 处理执行结果和潜在的异常
  6. 生成节点执行结果
5.2.7 AgentNode 实现

AgentNode 是代理节点,负责调用 AI 代理执行复杂任务:

class AgentNode(ToolNode):
    """
    Agent Node
    """

    _node_data_cls = AgentNodeData
    _node_type = NodeType.AGENT

    def _run(self) -> Generator:
        """
        Run the agent node
        """
        node_data = cast(AgentNodeData, self.node_data)

        # get agent strategy
        try:
            from core.agent.strategy.strategy_factory import StrategyFactory

            strategy = StrategyFactory.create_strategy(
                tenant_id=self.tenant_id,
                app_id=self.app_id,
                strategy_mode=node_data.strategy_mode,
                strategy_config=node_data.strategy_config,
                user_id=self.user_id,
                invoke_from=self.invoke_from,
            )
        except Exception as e:
            yield RunCompletedEvent(
                run_result=NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs={},
                    error=f"Failed to create agent strategy: {str(e)}",
                )
            )
            return

        # generate agent parameters
        agent_parameters = self._generate_parameters(
            tool_parameters=node_data.parameters,
            variable_pool=self.graph_runtime_state.variable_pool,
            node_data=node_data,
        )

        # get conversation id
        conversation_id = self.graph_runtime_state.variable_pool.get_system_variable("conversation_id")

        # invoke agent
        try:
            agent_response = strategy.invoke(
                conversation_id=conversation_id,
                inputs=agent_parameters,
                files=[],
            )

            # ... 处理代理响应 ...
        except Exception as e:
            # ... 处理异常 ...

AgentNode 的实现主要完成以下工作:

  1. 获取代理策略
  2. 生成代理参数
  3. 获取会话 ID
  4. 通过 strategy.invoke 调用代理
  5. 处理代理返回的结果
  6. 生成节点执行结果
5.2.8 HttpRequestNode 实现

HttpRequestNode 是 HTTP 请求节点,负责发送 HTTP 请求并处理响应:

class HttpRequestNode(BaseNode[HttpRequestNodeData]):
    """
    Http Request Node
    """

    _node_data_cls = HttpRequestNodeData
    _node_type = NodeType.HTTP_REQUEST

    def _run(self) -> NodeRunResult:
        """
        Run node
        """
        node_data = cast(HttpRequestNodeData, self.node_data)

        # get default config
        default_config = {
            "method": node_data.method,
            "url": node_data.url,
            "headers": node_data.headers,
            "params": node_data.params,
            "body": node_data.body,
            "timeout": node_data.timeout,
            "retry_count": node_data.retry_count,
            "retry_interval": node_data.retry_interval,
        }

        # init executor
        executor = HttpRequestExecutor(
            tenant_id=self.tenant_id,
            app_id=self.app_id,
            user_id=self.user_id,
            variable_pool=self.graph_runtime_state.variable_pool,
            default_config=default_config,
        )

        # execute http request
        try:
            response = executor.execute()

            # extract files
            files = []
            ifresponse.files:
                for file inresponse.files:
                    files.append(file.to_dict())

            # success
            ifresponse.success:
                return NodeRunResult(
                    status=WorkflowNodeExecutionStatus.SUCCEEDED,
                    inputs=response.request_info,
                    outputs={
                        "status_code": response.status_code,
                        "response_body": response.response_body,
                        "response_headers": response.response_headers,
                        "files": files,
                    },
                )
            # failed
            else:
                return NodeRunResult(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=response.request_info,
                    error=response.error,
                )
        except Exception as e:
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                inputs={},
                error=str(e),
            )

HttpRequestNode 的实现主要完成以下工作:

  1. 获取默认配置
  2. 初始化 HttpRequestExecutor
  3. 执行 HTTP 请求
  4. 处理响应(包括成功和失败情况)
  5. 提取文件
  6. 生成节点执行结果

六、工作流数据流动

6.1 工作流创建和发布

  1. 用户在界面上设计工作流,定义节点和连接
  2. 系统将设计转换为工作流配置
  3. 创建工作流模型和草稿变量
  4. 发布工作流,使其可被调用
6.2 工作流调试和执行

  1. 用户触发工作流执行
  2. 系统创建工作流运行记录
  3. 图引擎解析工作流配置,构建执行图
  4. 按照图的定义执行节点
  5. 记录每个节点的执行状态和结果
  6. 完成工作流执行,更新运行记录

七、图引擎机制

图引擎是工作流执行的核心,负责解析工作流图结构并执行节点。

7.1 图引擎实现

以下是 GraphEngine 类的部分实现:

class GraphEngine:
    """
    Graph Engine
    """

    def __init__(
        self,
        tenant_id: str,
        app_id: str,
        workflow_type: WorkflowType,
        workflow_id: str,
        user_id: str,
        invoke_from: InvokeFrom,
    ) -> None:
        """
        Initialize graph engine
        """
        self.tenant_id = tenant_id
        self.app_id = app_id
        self.workflow_type = workflow_type
        self.workflow_id = workflow_id
        self.user_id = user_id
        self.invoke_from = invoke_from

    def execute(
        self,
        workflow_run_id: str,
        workflow_config: dict[str, Any],
        user_inputs: dict[str, Any],
        system_variables: dict[str, Any],
        environment_variables: dict[str, Any],
        session_variables: dict[str, Any],
        *,
        event_handler: Optional[Callable[[WorkflowEvent], None]] = None,
    ) -> Generator[WorkflowEvent, None, None]:
        """
        Execute workflow
        """
        # Create graph init params
        graph_init_params = GraphInitParams(
            tenant_id=self.tenant_id,
            app_id=self.app_id,
            workflow_id=self.workflow_id,
            workflow_run_id=workflow_run_id,
            user_id=self.user_id,
            invoke_from=self.invoke_from,
        )

        # Create variable pool
        variable_pool = VariablePool(
            user_inputs=user_inputs,
            system_variables=system_variables,
            environment_variables=environment_variables,
            session_variables=session_variables,
        )

        # Create graph runtime state
        graph_runtime_state = GraphRuntimeState(
            variable_pool=variable_pool,
        )

        # Create graph
        graph = Graph(
            workflow_config=workflow_config,
            graph_init_params=graph_init_params,
            graph_runtime_state=graph_runtime_state,
        )

        # Create thread pool
        thread_pool = GraphEngineThreadPool(max_workers=10)

        # Execute graph
        try:
            # Yield workflow started event
            yield WorkflowStartedEvent(
                workflow_run_id=workflow_run_id,
            )

            # Execute graph
            for event in graph.execute(thread_pool=thread_pool):
                # Handle event
                if event_handler:
                    event_handler(event)

                # Yield event
                yield event

                # Check if workflow is completed
                if isinstance(event, WorkflowCompletedEvent):
                    break

        except Exception as e:
            # Yield workflow failed event
            yield WorkflowFailedEvent(
                workflow_run_id=workflow_run_id,
                error=str(e),
            )

        finally:
            # Shutdown thread pool
            thread_pool.shutdown(wait=True)

GraphEngineThreadPool 是一个继承自 ThreadPoolExecutor 的线程池,用于管理工作流的并行执行:

class GraphEngineThreadPool(ThreadPoolExecutor):
    """
    Graph Engine Thread Pool
    """

    def __init__(self, max_workers: Optional[int] = None) -> None:
        """
        Initialize graph engine thread pool
        """
        super().__init__(max_workers=max_workers)
        self._futures: dict[str, Future] = {}

    def submit_task(
        self, thread_pool_id: str, fn: Callable, *args: Any, **kwargs: Any
    ) -> Future:
        """
        Submit task to thread pool
        """
        future = self.submit(fn, *args, **kwargs)
        self._futures[thread_pool_id] = future
        future.add_done_callback(lambda _: self._futures.pop(thread_pool_id, None))
        returnfuture

    def is_full(self) -> bool:
        """
        Check if thread pool is full
        """
        return len(self._futures) >= self._max_workers

    def get_future(self, thread_pool_id: str) -> Optional[Future]:
        """
        Get future by thread pool id
        """
        return self._futures.get(thread_pool_id)
7.2 图结构解析

图引擎首先解析工作流的图结构,包括:

  1. 解析节点:解析工作流中的所有节点,包括节点类型、配置等
  2. 解析边:解析节点之间的连接关系,包括源节点、目标节点、源端口、目标端口等
  3. 构建节点映射:构建节点ID到节点对象的映射
  4. 构建边映射:构建边ID到边对象的映射
7.3 图结构实现

以下是 Graph 类的部分实现,它负责解析工作流配置并执行节点:

class Graph:
    """
    Graph
    """

    def __init__(
        self,
        workflow_config: dict[str, Any],
        graph_init_params: GraphInitParams,
        graph_runtime_state: GraphRuntimeState,
    ) -> None:
        """
        Initialize graph
        """
        self.workflow_config = workflow_config
        self.graph_init_params = graph_init_params
        self.graph_runtime_state = graph_runtime_state

        # Parse workflow config
        self.nodes = self._parse_nodes(workflow_config.get("nodes", {}))
        self.edges = self._parse_edges(workflow_config.get("edges", {}))

        # Build node and edge mappings
        self.node_mapping = self._build_node_mapping(self.nodes)
        self.edge_mapping = self._build_edge_mapping(self.edges)

        # Build source and target node mappings
        self.source_node_mapping = self._build_source_node_mapping(self.edges)
        self.target_node_mapping = self._build_target_node_mapping(self.edges)

    def execute(self, thread_pool: GraphEngineThreadPool) -> Generator[WorkflowEvent, None, None]:
        """
        Execute graph
        """
        # Find start node
        start_node = self._find_start_node()
        ifnot start_node:
            yield WorkflowFailedEvent(
                workflow_run_id=self.graph_init_params.workflow_run_id,
                error="Start node not found",
            )
            return

        # Execute start node
        for event in self._execute_node(start_node, thread_pool=thread_pool):
            yield event

        # Yield workflow completed event
        yield WorkflowCompletedEvent(
            workflow_run_id=self.graph_init_params.workflow_run_id,
        )

    def _execute_node(
        self, node: BaseNode, thread_pool: GraphEngineThreadPool, previous_node_id: Optional[str] = None
    ) -> Generator[WorkflowEvent, None, None]:
        """
        Execute node
        """
        # Yield node started event
        yield NodeStartedEvent(
            workflow_run_id=self.graph_init_params.workflow_run_id,
            node_id=node.node_id,
            node_type=node.node_type,
        )

        # Run node
        try:
            for event in node.run():
                # Handle node event
                if isinstance(event, RunCompletedEvent):
                    # Get node run result
                    node_run_result = event.run_result

                    # Update variable pool
                    if node_run_result.outputs:
                        for variable_name, variable_value in node_run_result.outputs.items():
                            self.graph_runtime_state.variable_pool.add(
                                node_id=node.node_id,
                                variable_name=variable_name,
                                variable_value=variable_value,
                            )

                    # Yield node completed event
                    yield NodeCompletedEvent(
                        workflow_run_id=self.graph_init_params.workflow_run_id,
                        node_id=node.node_id,
                        node_type=node.node_type,
                        status=node_run_result.status,
                        inputs=node_run_result.inputs,
                        outputs=node_run_result.outputs,
                        process_data=node_run_result.process_data,
                        error=node_run_result.error,
                    )

                    # Find next nodes
                    next_nodes = self._find_next_nodes(
                        node_id=node.node_id,
                        edge_source_handle=node_run_result.edge_source_handle,
                    )

                    # Execute next nodes
                    for next_node in next_nodes:
                        # Check if thread pool is full
                        if thread_pool.is_full():
                            # Execute next node in current thread
                            for event in self._execute_node(
                                node=next_node,
                                thread_pool=thread_pool,
                                previous_node_id=node.node_id,
                            ):
                                yield event
                        else:
                            # Execute next node in new thread
                            thread_pool_id = str(uuid.uuid4())
                            thread_pool.submit_task(
                                thread_pool_id=thread_pool_id,
                                fn=self._execute_node_in_thread,
                                node=next_node,
                                thread_pool=thread_pool,
                                previous_node_id=node.node_id,
                                thread_pool_id=thread_pool_id,
                            )
                else:
                    # Yield other events
                    yield event

        except Exception as e:
            # Yield node failed event
            yield NodeFailedEvent(
                workflow_run_id=self.graph_init_params.workflow_run_id,
                node_id=node.node_id,
                node_type=node.node_type,
                error=str(e),
            )
7.4 节点执行

图引擎根据图结构执行节点:

  1. 确定起始节点:通常是START节点
  2. 执行节点:调用节点的run方法
  3. 处理节点结果:根据节点执行结果确定下一个要执行的节点
  4. 处理并行执行:如果有多个分支,可以并行执行
7.4.1 节点执行的主要流程

节点执行的主要流程如下:

  1. 发出节点开始事件:触发 NodeRunStartedEvent,通知系统节点开始执行
  2. 调用节点的 run 方法:执行节点的具体逻辑
  3. 处理节点事件
    • 处理 RunCompletedEvent:获取节点执行结果
    • 处理 RunStreamChunkEvent:处理流式输出
    • 处理 RunRetrieverResourceEvent:处理检索资源
  4. 处理重试逻辑:如果节点执行失败且配置了重试,则进行重试
  5. 更新变量池:将节点输出变量添加到变量池中
  6. 发出节点完成事件:根据执行结果触发相应事件
    • 成功:触发 NodeRunSucceededEvent
    • 失败:触发 NodeRunFailedEvent
    • 异常但继续:触发 NodeRunExceptionEvent
  7. 查找下一个要执行的节点:根据边映射和条件确定下一个节点
  8. 执行下一个节点:可能是串行执行或并行执行

7.4.2 查找下一个节点的机制

在工作流执行过程中,确定下一个要执行的节点是关键步骤。GraphEngine 类的 _run 方法实现了这一机制:

  1. 获取边映射:通过 self.graph.edge_mapping.get(next_node_id) 获取当前节点的所有出边

  2. 单边处理:如果只有一条出边,直接获取目标节点ID

    if len(edge_mappings) == 1:
        edge = edge_mappings[0]
        # 检查是否有运行条件
        if edge.run_condition:
            result = ConditionManager.get_condition_handler(...).check(...)
            if not result:
                break  # 条件不满足,停止执行
        next_node_id = edge.target_node_id
    
  3. 多边处理:如果有多条出边,需要根据条件或并行策略确定下一个节点

    • 条件分支:如果边有运行条件,根据条件结果确定要执行的分支

      if any(edge.run_condition for edge in edge_mappings):
          # 按条件分组
          condition_edge_mappings: dict[str, list[GraphEdge]] = {}
          # 检查每个条件组
          for _, sub_edge_mappings in condition_edge_mappings.items():
              # 检查条件是否满足
              result = ConditionManager.get_condition_handler(...).check(...)
              if result:
                  # 条件满足,确定下一个节点
                  if len(sub_edge_mappings) == 1:
                      final_node_id = edge.target_node_id
                  else:
                      # 并行执行多个分支
                      parallel_generator = self._run_parallel_branches(...)
      
    • 并行分支:如果没有条件或条件满足,可能需要并行执行多个分支

      else:
          parallel_generator = self._run_parallel_branches(
              edge_mappings=edge_mappings,
              in_parallel_id=in_parallel_id,
              parallel_start_node_id=parallel_start_node_id,
              handle_exceptions=handle_exceptions,
          )
      
  4. 并行分支执行:通过 _run_parallel_branches 方法处理并行分支

    • 创建线程池和队列管理并行执行
    • 为每个分支创建一个线程执行
    • 收集并处理所有分支的执行结果
  5. 检查节点是否在当前并行分支内:确保节点执行不会跨越并行分支边界

    if in_parallel_id and self.graph.node_parallel_mapping.get(next_node_id, "") != in_parallel_id:
        break
    

通过这种机制,工作流系统能够灵活地处理各种复杂的执行路径,包括条件分支和并行执行,确保工作流按照设计的逻辑正确执行。

八、图引擎与节点执行的通信机制

图引擎与节点执行之间的通信是通过事件驱动机制实现的,这种机制使得工作流执行过程中的各个组件能够松耦合地交互,提高了系统的可扩展性和可维护性。

8.1 事件驱动架构

工作流系统采用事件驱动架构,通过定义和传递各种事件来实现图引擎与节点之间的通信。这种架构具有以下特点:

  1. 松耦合:图引擎和节点之间通过事件进行通信,而不是直接调用,降低了组件间的依赖
  2. 可扩展:新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
  3. 异步处理:事件可以异步处理,提高系统的响应性和吞吐量
  4. 状态追踪:通过事件可以追踪工作流的执行状态和历史

8.2 核心事件类型

工作流系统定义了多种事件类型,用于表示工作流执行过程中的不同状态和操作:

8.2.1 图级事件
  • GraphRunStartedEvent:工作流开始执行
  • GraphRunSucceededEvent:工作流成功完成
  • GraphRunFailedEvent:工作流执行失败
  • GraphRunPartialSucceededEvent:工作流部分成功(有些节点失败但不影响整体结果)
8.2.2 节点级事件
  • NodeRunStartedEvent:节点开始执行
  • NodeRunSucceededEvent:节点执行成功
  • NodeRunFailedEvent:节点执行失败
  • NodeRunExceptionEvent:节点执行异常但继续执行
  • NodeRunRetryEvent:节点重试执行
  • NodeRunStreamChunkEvent:节点产生流式输出
  • NodeRunRetrieverResourceEvent:节点检索资源
8.2.3 并行分支事件
  • ParallelBranchRunStartedEvent:并行分支开始执行
  • ParallelBranchRunSucceededEvent:并行分支执行成功
  • ParallelBranchRunFailedEvent:并行分支执行失败
8.2.4 迭代和循环事件
  • IterationRunStartedEvent:迭代开始
  • IterationRunNextEvent:迭代下一步
  • IterationRunSucceededEvent:迭代成功完成
  • IterationRunFailedEvent:迭代失败
  • LoopRunStartedEvent:循环开始
  • LoopRunNextEvent:循环下一步
  • LoopRunSucceededEvent:循环成功完成
  • LoopRunFailedEvent:循环失败
8.3 事件传递流程

事件在工作流系统中的传递流程如下:

  1. 事件生成:图引擎或节点执行器生成事件

    yield NodeRunStartedEvent(
        id=node_instance.id,
        node_id=node_instance.node_id,
        node_type=node_instance.node_type,
        node_data=node_instance.node_data,
        route_node_state=route_node_state,
        predecessor_node_id=node_instance.previous_node_id,
        # 其他参数...
    )
    
  2. 事件传递:通过 Python 生成器(Generator)机制传递事件

    def run(self) -> Generator[GraphEngineEvent, None, None]:
        # ...
        generator = graph_engine.run()
        for event in generator:
            # 处理事件
            yield event
    
  3. 事件处理:工作流入口点(WorkflowEntry)接收事件并分发给回调处理器

    for event in generator:
        if callbacks:
            for callback in callbacks:
                callback.on_event(event=event)
        yield event
    
  4. 回调处理:回调处理器根据事件类型执行相应的操作

    def on_event(self, event: GraphEngineEvent) -> None:
        if isinstance(event, NodeRunStartedEvent):
            self.on_workflow_node_execute_started(event=event)
        elif isinstance(event, NodeRunSucceededEvent):
            self.on_workflow_node_execute_succeeded(event=event)
        # 处理其他事件类型...
    
8.4 事件处理回调

工作流系统定义了回调接口,允许外部系统注册回调函数来处理工作流事件:

class WorkflowCallback(ABC):
    @abstractmethod
    def on_event(self, event: GraphEngineEvent) -> None:
        """处理工作流事件"""
        raise NotImplementedError

系统提供了多种内置回调实现,如:

  1. WorkflowLoggingCallback:记录工作流执行日志
  2. WorkflowAppRunnerCallback:处理应用级别的工作流事件
8.5 事件与状态管理

事件不仅用于通信,还用于管理工作流的状态:

  1. 节点状态追踪:通过事件记录节点的执行状态和结果

    # 节点开始执行
    yield NodeRunStartedEvent(...)
    
    # 节点执行成功
    yield NodeRunSucceededEvent(...)
    
  2. 变量传递:事件携带节点的输入和输出变量

    # 节点执行成功事件包含输出变量
    yield NodeRunSucceededEvent(
        # ...
        outputs=run_result.outputs,
        # ...
    )
    
  3. 错误处理:事件携带错误信息,用于错误处理和重试

    # 节点执行失败事件包含错误信息
    yield NodeRunFailedEvent(
        error=route_node_state.failed_reason or "Unknown error.",
        # ...
    )
    
8.6 事件转换与应用集成

工作流应用运行器(WorkflowAppRunner)将工作流事件转换为应用级别的队列事件,实现与应用系统的集成:

def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) -> None:
    if isinstance(event, NodeRunSucceededEvent):
        self._publish_event(
            QueueNodeSucceededEvent(
                node_execution_id=event.id,
                node_id=event.node_id,
                node_type=event.node_type,
                node_data=event.node_data,
                inputs=inputs,
                process_data=process_data,
                outputs=outputs,
                execution_metadata=execution_metadata,
                # 其他参数...
            )
        )
    # 处理其他事件类型...

这种转换机制使得工作流系统能够与外部应用系统无缝集成,同时保持内部实现的独立性。

8.7 事件通信示例

以下是一个完整的事件通信流程示例,展示了从节点执行到事件处理的整个过程:

8.7.1 节点执行与事件生成

当图引擎执行一个节点时,会生成一系列事件:

# 1. 节点开始执行事件
yield NodeRunStartedEvent(
    id=node_instance.id,
    node_id=node_instance.node_id,
    node_type=node_instance.node_type,
    node_data=node_instance.node_data,
    route_node_state=route_node_state,
    # 其他参数...
)

# 2. 执行节点的run方法
generator = node_instance.run()
for item in generator:
    # 传递节点产生的事件
    yield item

# 3. 节点执行成功事件
yield NodeRunSucceededEvent(
    id=node_instance.id,
    node_id=node_instance.node_id,
    node_type=node_instance.node_type,
    node_data=node_instance.node_data,
    route_node_state=route_node_state,
    # 其他参数...
)
8.7.2 事件传递与处理

事件通过工作流入口点传递给回调处理器:

# WorkflowEntry.run方法
def run(self, *, callbacks: Sequence[WorkflowCallback]) -> Generator[GraphEngineEvent, None, None]:
    generator = graph_engine.run()
    for event in generator:
        # 分发事件给回调处理器
        for callback in callbacks:
            callback.on_event(event=event)
        # 继续传递事件
        yield event
8.7.3 回调处理器处理事件

回调处理器根据事件类型执行相应的操作:

# WorkflowLoggingCallback.on_event方法
def on_event(self, event: GraphEngineEvent) -> None:
    if isinstance(event, NodeRunStartedEvent):
        self.print_text("\n[NodeRunStartedEvent]", color="yellow")
        self.print_text(f"Node ID: {event.node_id}", color="yellow")
        self.print_text(f"Node Title: {event.node_data.title}", color="yellow")
        self.print_text(f"Type: {event.node_type.value}", color="yellow")
    elif isinstance(event, NodeRunSucceededEvent):
        self.print_text("\n[NodeRunSucceededEvent]", color="green")
        # 打印节点执行结果
        if event.route_node_state.node_run_result:
            node_run_result = event.route_node_state.node_run_result
            self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs)}", color="green")
    # 处理其他事件类型...
8.7.4 应用运行器处理事件

应用运行器将工作流事件转换为应用级别的队列事件:

# WorkflowAppRunner._handle_event方法
def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) -> None:
    if isinstance(event, GraphRunStartedEvent):
        self._publish_event(QueueWorkflowStartedEvent(...))
    elif isinstance(event, NodeRunSucceededEvent):
        self._publish_event(QueueNodeSucceededEvent(...))
    elif isinstance(event, NodeRunFailedEvent):
        self._publish_event(QueueNodeFailedEvent(...))
    # 处理其他事件类型...
8.8 事件通信的优势

图引擎与节点执行之间基于事件的通信机制具有以下优势:

  1. 解耦组件:图引擎和节点执行器通过事件进行通信,而不是直接调用,降低了组件间的耦合度
  2. 简化调试:事件包含完整的上下文信息,便于调试和问题排查
  3. 支持异步执行:事件可以异步处理,支持并行执行和分布式部署
  4. 可扩展性:新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
  5. 状态追踪:通过事件可以完整记录工作流的执行状态和历史,便于监控和审计
  6. 错误处理:事件携带错误信息,支持灵活的错误处理策略和重试机制

九、错误处理机制

工作流系统提供了完善的错误处理机制,包括错误策略、重试机制和异常处理,确保工作流在面对各种异常情况时能够灵活应对。

9.1 错误处理策略

工作流系统提供了两种主要的错误处理策略:

  1. FAIL_BRANCH:当节点执行失败时,沿着失败分支继续执行
    • 将错误信息和类型添加到变量池
    • 设置 edge_source_handleFAILED,使工作流可以沿着专门处理失败情况的分支继续执行
    • 适用于需要针对失败情况执行特定逻辑的场景
  2. DEFAULT_VALUE:当节点执行失败时,使用预定义的默认值继续执行
    • 将错误信息和类型添加到变量池
    • 使用节点配置中预定义的默认值作为节点输出
    • 适用于即使失败也需要提供某种结果的场景

9.2 节点重试机制

对于某些类型的节点,系统支持在执行失败时进行重试:

  1. 重试配置
    • max_retries:最大重试次数
    • retry_interval_seconds:重试间隔时间(秒)
  2. 重试流程
    • 节点执行失败后,检查是否配置了重试
    • 如果当前重试次数小于最大重试次数,触发 NodeRunRetryEvent 事件
    • 等待指定的重试间隔时间
    • 重新执行节点
  3. 重试事件
    • 系统触发 NodeRunRetryEvent 事件,包含重试索引、开始时间等信息
    • 事件可用于监控和记录重试情况
if node_instance.should_retry and retries < max_retries:
    retries += 1
    route_node_state.node_run_result = run_result
    yield NodeRunRetryEvent(
        id=str(uuid.uuid4()),
        node_id=node_instance.node_id,
        node_type=node_instance.node_type,
        node_data=node_instance.node_data,
        route_node_state=route_node_state,
        error=run_result.error or "Unknown error",
        retry_index=retries,
        start_at=retry_start_at,
    )
    time.sleep(retry_interval)
8.3 可继续执行和可重试的节点类型

系统定义了特定类型的节点,它们在错误处理方面有特殊行为:

  1. 可继续执行的节点类型CONTINUE_ON_ERROR_NODE_TYPE):
    • 即使执行失败,工作流也可以继续执行
    • 例如:HTTP请求节点、LLM节点等
    • 这些节点可以配置错误策略(FAIL_BRANCH或DEFAULT_VALUE)
  2. 可重试的节点类型RETRY_ON_ERROR_NODE_TYPE):
    • 执行失败时可以自动重试
    • 例如:HTTP请求节点、数据库操作节点等
    • 这些节点可以配置最大重试次数和重试间隔

通过这些机制,工作流系统能够灵活处理各种错误情况,提高工作流的健壮性和可靠性。

9. 变量管理机制

变量管理是工作流执行的重要组成部分,负责管理工作流中的变量。

8.1 变量池

变量池是工作流中所有变量的集合,包括:

  1. 用户输入变量:用户提供的输入
  2. 系统变量:系统提供的变量,如时间戳、会话ID等
  3. 环境变量:环境相关的变量
  4. 会话变量:会话相关的变量
  5. 节点输出变量:节点执行后的输出变量

以下是 VariablePool 类的部分实现:

class VariablePool:
    """
    Variable Pool
    """

    def __init__(
        self,
        user_inputs: dict[str, Any],
        system_variables: dict[str, Any],
        environment_variables: dict[str, Any],
        session_variables: dict[str, Any],
    ) -> None:
        """
        Initialize variable pool
        """
        self.user_inputs = user_inputs
        self.system_variables = system_variables
        self.environment_variables = environment_variables
        self.session_variables = session_variables

        # Initialize variable dictionary
        self.variable_dictionary: dict[str, Any] = {}

    def add(self, node_id: str, variable_name: str, variable_value: Any) -> None:
        """
        Add variable to variable pool
        """
        # Check if variable value is File
        if isinstance(variable_value, File):
            # Convert File to dict
            variable_value = variable_value.to_dict()

        # Add variable to variable dictionary
        self.variable_dictionary[f"{node_id}.{variable_name}"] = variable_value

    def get_variable(self, variable_selector: str) -> Any:
        """
        Get variable from variable pool
        """
        # Check if variable selector is empty
        ifnot variable_selector:
            returnNone

        # Check if variable selector is system variable
        if variable_selector.startswith(SYSTEM_VARIABLE_NODE_ID):
            # Get system variable
            variable_name = variable_selector.split(".", 1)[1]
            return self.get_system_variable(variable_name)

        # Check if variable selector is user input
        if variable_selector.startswith(USER_INPUT_NODE_ID):
            # Get user input
            variable_name = variable_selector.split(".", 1)[1]
            return self.get_user_input(variable_name)

        # Check if variable selector is environment variable
        if variable_selector.startswith(ENVIRONMENT_VARIABLE_NODE_ID):
            # Get environment variable
            variable_name = variable_selector.split(".", 1)[1]
            return self.get_environment_variable(variable_name)

        # Check if variable selector is session variable
        if variable_selector.startswith(SESSION_VARIABLE_NODE_ID):
            # Get session variable
            variable_name = variable_selector.split(".", 1)[1]
            return self.get_session_variable(variable_name)

        # Get variable from variable dictionary
        return self.variable_dictionary.get(variable_selector)
8.2 变量传递

变量在节点之间的传递遵循以下规则:

  1. 变量选择器:通过变量选择器指定要使用的变量
  2. 变量作用域:变量的作用域为整个工作流
  3. 变量覆盖:后执行的节点可以覆盖先执行的节点的变量

变量选择器的格式为 node_id.variable_name,例如:

  • system.conversation_id:系统变量中的会话ID
  • user_input.query:用户输入中的查询
  • node_1.result:节点1的输出变量 result

9. 并行执行机制

工作流支持并行执行多个分支,通过 GraphEngineThreadPool 实现:

Dify 工作流支持并行执行多个分支:

  1. 通过 GraphParallel 模型定义并行分支
  2. 使用 parallel_mappingnode_parallel_mapping 管理并行关系
  3. 支持条件分支,根据条件选择执行路径
  4. 限制并行层级,避免过度复杂的执行图

十、总结

Dify 工作流系统是一个功能强大的可视化 AI 工作流引擎,通过图结构组织节点执行,使用变量池管理数据流动,支持多种节点类型、错误处理和并行执行。系统的核心组件包括:

  1. 工作流服务:管理工作流的生命周期
  2. 工作流入口:工作流执行的入口点
  3. 图引擎:负责节点的调度和执行
  4. 变量池:管理工作流中的变量
  5. 节点实现:各类节点的具体实现

通过这些组件的协同工作,Dify 工作流系统能够支持从简单到复杂的 AI 应用场景,为用户提供灵活且强大的工作流设计和执行能力。

普通人如何抓住AI大模型的风口?

领取方式在文末

为什么要学习大模型?

目前AI大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 , 大模型作为其中的重要组成部分 , 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力, 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 ,为各行各业带来了革命性的改变和机遇 。

目前,开源人工智能大模型已应用于医疗、政务、法律、汽车、娱乐、金融、互联网、教育、制造业、企业服务等多个场景,其中,应用于金融、企业服务、制造业和法律领域的大模型在本次调研中占比超过 30%。
在这里插入图片描述

随着AI大模型技术的迅速发展,相关岗位的需求也日益增加。大模型产业链催生了一批高薪新职业:
在这里插入图片描述

人工智能大潮已来,不加入就可能被淘汰。如果你是技术人,尤其是互联网从业者,现在就开始学习AI大模型技术,真的是给你的人生一个重要建议!

最后

只要你真心想学习AI大模型技术,这份精心整理的学习资料我愿意无偿分享给你,但是想学技术去乱搞的人别来找我!

在当前这个人工智能高速发展的时代,AI大模型正在深刻改变各行各业。我国对高水平AI人才的需求也日益增长,真正懂技术、能落地的人才依旧紧缺。我也希望通过这份资料,能够帮助更多有志于AI领域的朋友入门并深入学习。

真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发

在这里插入图片描述

大模型全套学习资料展示

自我们与MoPaaS魔泊云合作以来,我们不断打磨课程体系与技术内容,在细节上精益求精,同时在技术层面也新增了许多前沿且实用的内容,力求为大家带来更系统、更实战、更落地的大模型学习体验。

图片

希望这份系统、实用的大模型学习路径,能够帮助你从零入门,进阶到实战,真正掌握AI时代的核心技能!

01 教学内容

图片

  • 从零到精通完整闭环:【基础理论 →RAG开发 → Agent设计 → 模型微调与私有化部署调→热门技术】5大模块,内容比传统教材更贴近企业实战!

  • 大量真实项目案例: 带你亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

02适学人群

应届毕业生‌: 无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌: 非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能突破瓶颈: 传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

image.png

vx扫描下方二维码即可
在这里插入图片描述

本教程比较珍贵,仅限大家自行学习,不要传播!更严禁商用!

03 入门到进阶学习路线图

大模型学习路线图,整体分为5个大的阶段:
图片

04 视频和书籍PDF合集

图片

从0到掌握主流大模型技术视频教程(涵盖模型训练、微调、RAG、LangChain、Agent开发等实战方向)

图片

新手必备的大模型学习PDF书单来了!全是硬核知识,帮你少走弯路(不吹牛,真有用)
图片

05 行业报告+白皮书合集

收集70+报告与白皮书,了解行业最新动态!
图片

06 90+份面试题/经验

AI大模型岗位面试经验总结(谁学技术不是为了赚$呢,找个好的岗位很重要)图片
在这里插入图片描述

07 deepseek部署包+技巧大全

在这里插入图片描述

由于篇幅有限

只展示部分资料

并且还在持续更新中…

真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发

在这里插入图片描述

Logo

更多推荐