问题

当您在步进函数中运行并发任务时,无论是作为Parallel状态还是Map状态,您可能希望等到所有任务完成,无论它们是成功还是失败。默认情况下,MapParallel状态将在结束或继续下一个状态之前等待所有的成功,但是如果单个迭代或分支失败,它们将被终止,并且所有其他正在进行的执行将被停止.

虽然这种行为可能与快速失败的理念相呼应,但在某些情况下,您可能希望 hold 在任何失败时终止这种总体状态。例如:

  • 给定我想要并行运行的 3 个工作负载分支,我希望在最后得到一个聚合响应,包含来自每个分支的响应,无论是成功还是失败。

  • 给定数组中的 5 个项目,我想同时执行一组步骤,我想知道哪些成功,哪些失败(和推理)。

解决方案

解决方案是处理可能的异常并使用Pass状态来传播结果。

Parallel状态

为了演示并行状态下的问题,让我们看看如果分支失败会发生什么。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--0FRO0wlb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/i/y2vhmsvmwlm3we8dgu5a.png)

在此示例中,您将看到我有两个工作负载分支,long success表示长时间运行且最终成功的工作负载,而quick fail表示失败的工作负载_before_long success完成。屏幕截图显示了并行状态(和整个状态机)是如何被取消的,因为quick fail分支失败,而long success也已停止(灰色)。这与 AWS 文档所说的一致

如果任何分支由于未处理的错误或转换到 Fail 状态而失败,则认为整个 Parallel 状态已失败并且其所有分支都将停止。如果错误未由 Parallel 状态本身处理,则 Step Functions 会停止执行并出现错误。

现在让我们看看如果我们 handlequick fail抛出的任何错误并使用Pass状态返回此分支的结果会发生什么。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--4gzoabpR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/i/u80whuvx6eh4qelrv0sl.png)

您可以看到quick fail步骤变为琥珀色,这意味着它处于Caught Error状态,并且它有一个handle failure的后续步骤,这是一个Pass状态,它只是返回一个输出并结束分支执行。

这里最大的区别是,虽然quick fail的分支已经完成,但由于long success分支正在进行中,整体并行状态仍处于进行中,如蓝色所示。最终,long success分支将完成,并与handle failure的输出一起,将两个执行结果聚合到Final State中。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--6a54ShCq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/i/v6h682lf9b0plwiino2w.png)

带有错误处理的状态机的定义看起来像这样

{
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Next": "Final State",
      "Branches": [
        {
          "StartAt": "long success",
          "States": {
            "long success": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:long-success",
              "End": true
            }
          }
        },
        {
          "StartAt": "quick fail",
          "States": {
            "quick fail": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:quick-fail",
              "Catch": [
                {
                  "ErrorEquals": [
                    "Exception"
                  ],
                  "Next": "handle failure",
                  "ResultPath": "$.error"
                }
              ],
              "End": true
            },
            "handle failure": {
              "Type": "Pass",
              "End": true
            }
          }
        }
      ]
    },
    "Final State": {
      "Type": "Pass",
      "End": true
    }
  }
}

进入全屏模式 退出全屏模式

鉴于上面的代码片段,您可能会看到Final State的数组输出,其中包含两个元素,如下所示

[
  <long-success-response>,
  {
    <input-json-blob>,
    "error": {
      "Error": "Exception",
      "Cause": "{\"errorMessage\": \"quick fail function has failed\", \"errorType\": \"Exception\", \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 6, in lambda_handler\\n    raise Exception(\\\"quick fail function has failed\\\")\\n\"]}"
    }
  }
]

进入全屏模式 退出全屏模式

从这里,您可以处理来自每个单独分支的响应,而不会导致整个状态机失败。

Map状态

同样,如果您的并行性是动态的,您可以在Map状态下获得相同的结果。

使用这种方法的成功定义看起来像这样

{
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemsPath": "$.array",
      "ResultPath": "$.array",
      "Next": "Final State",
      "Iterator": {
        "StartAt": "mapper",
        "States": {
          "mapper": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:sfn-mapper",
            "Catch": [
              {
                "ErrorEquals": [
                  "Exception"
                ],
                "Next": "handle mapper error",
                "ResultPath": "$.error"
              }
            ],
            "ResultPath": "$.data",
            "End": true
          },
          "handle mapper error": {
            "Type": "Pass",
            "End": true
          }
        }
      }
    },
    "Final State": {
      "Type": "Pass",
      "End": true
    }
  }
}

进入全屏模式 退出全屏模式

在此示例中,我使用"ResultPath": "$.data""ResultPath": "$.error"在响应中的键dataerror下记录successfail,以便于区分。mapper状态运行一个 lambda 函数,该函数要么返回成功响应,要么抛出错误,具体取决于执行输入。输入看起来像这样

{
  "array": [
    {
      "wait": 5,
      "throw": true,
      "name": "one"
    },
    {
      "wait": 10,
      "throw": false,
      "name": "two"
    }
  ]
}

进入全屏模式 退出全屏模式

lambda函数sfn-mapper长这样(以Python为例)

import json
import time

def lambda_handler(event, context):
    time.sleep(int(event["wait"]))
    if bool(event["throw"]):
        raise Exception(f"{event['name']} has thrown an error")

    return f"{event['name']} has succeeded after {event['wait']} seconds"

进入全屏模式 退出全屏模式

当您运行这样的状态机时,最终状态将类似于此

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--HI1xgaly--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/i/ch9z8yyc6ao0boiqezle.png)

由于我们在每次迭代中都处理异常,因此整个状态机都成功了。如果您查看来自Final State的响应,您将看到基于上述输入示例的类似内容。

{
  "array": [
    {
      "wait": 5,
      "throw": true,
      "name": "one",
      "error": {
        "Error": "Exception",
        "Cause": "{\"errorMessage\": \"one has thrown an error\", \"errorType\": \"Exception\", \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 7, in lambda_handler\\n    raise Exception(f\\\"{event['name']} has thrown an error\\\")\\n\"]}"
      }
    },
    {
      "wait": 10,
      "throw": false,
      "name": "two",
      "data": "two has succeeded after 10 seconds"
    }
  ]
}

进入全屏模式 退出全屏模式

请注意对应于相应迭代的errordata键。

我希望你喜欢这个演练。ParallelMap状态下的错误处理在您希望更好地控制如何处理这些状态下的单个响应的情况下可能非常有用。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐