🔰 刚接触 AWS CDK 的初学者,请务必将我在本系列中的前几篇文章一一看一下。

如果万一错过了我之前的文章,请通过以下链接找到它。

🔁 🔗Dev Post上的原帖

https://devpost.hashnode.dev/aws-cdk-101-fetching-json-from-s3-through-stepfunction

🔁 将🔗dev 上的帖子转发到@aravindvcyber

https://dev.to/aravindvcyber/aws-cdk-101-fetching-json-from-s3-through-stepfunction-15ob

在本文中,让我们重构我们之前使用基于 S3 的存储来检索 JSON 数据以处理成更高性能的基于 dynamodb 的步骤函数。与另一个相比,两者都有其优势,因此我们必须根据预期的周转时间明智地选择它们。

用 S3 可视化这个场景⛵️

s3铲斗

用 dynamodb 可视化这个场景🚀

dynamodb

从 dynamodb 与 S3 获取 JSON 的好处❄️

  • 简而言之,使用 dynamodb 代替 S3 的第一个好处是减少了 I/O 操作所需的时间。

  • S3 比类似的 dynamodb 操作需要更多的时间。

  • 虽然 dynamodb 与 s3 存储相比成本更高,但当我们使用良好的存档设置时,这可以成为我们的优势。

  • 进一步的 S3 操作是一个外部请求,它具有 DNS 查找和 httpAgent 创建等开销。

  • 虽然 dynamodb 被视为数据库事务并且速度更快。

  • 此外,您可以将 S3 元数据移动到 dynamodb 中的字段中,以便于操作。

  • 这也将有助于有机会在每个处理步骤中仅查询和提取所需字段,而无需像 S3 中那样下载完整的对象内容。

  • 当我们有更多的前缀时,S3 也会有更好的吞吐量,这可以改变密钥路径。而在 dynamodb 中,我们通过使用预配置和按请求付费模型来很好地利用吞吐量。

  • 我们还可以根据请求微调我们的读取和写入容量,这些请求可能会受到限制,以实现高效的成本管理和可靠性,并完全控制预留。

  • 顺便说一下,我们为什么要减少 I/O 时间,这是因为我们总是从计算资源触发这个,该计算资源是按容量和运行时间计量的,它将等待更长的空闲时间。

  • 数据是 dynamodb,我们可以像 S3 一样高效地归档各种服务集成,除了我们与 dynamodb 具有的特殊 TTL 属性,可以用来删除像我们案例中的临时对象。

分析的最终结果🏀

我尝试使用 X 射线和新遗迹追踪这一点,并找到以下观察结果。

并且请理解可能存在一些差异,因为来自 X 射线和新遗物的痕迹都以不同的方式跟踪时间。

然而,这里要注意的一点是,当我们有高吞吐量事务而没有对资源规范进行任何微调时,两者都表明 S3 可能比 dynamodb 慢一些。

putObject 的 xray 的一些跟踪结果🏭

dynamodb 放

来自 newrelic 的 putItem 的深入追踪🎠

新遗物放

getObject 的 xray 的一些跟踪结果⛲️

dynamodb 获取

从 newrelic 中深入追踪 getItem ⛵️

newrelic 获取

施工⛺️

在这里,让我们首先在主堆栈中创建一个新表,如下所示。

const stgMessages = new dynamodb.Table(this, "stgMessagesTable", {
      tableName: process.env.stgMessagesTable,
      sortKey: { name: "createdAt", type: dynamodb.AttributeType.NUMBER },
      partitionKey: { name: "messageId", type: dynamodb.AttributeType.STRING },
      encryption: dynamodb.TableEncryption.AWS_MANAGED,
      readCapacity: 5,
      writeCapacity: 5,
});

并授予对入口处理程序和后端 step-function 间接调用处理程序的访问权限。


stgMessages.grantWriteData(eventCounterBus.handler);
stgMessages.grantReadData(messageRecorder);

表

重构消息入口处理程序🎪

以前用于保存到 S3 的现有消息条目处理程序现在将得到增强,如下所示。

辅助函数 putItem

const dbPut: any = async (msg: any) => {
  const dynamo = new DynamoDB();
  const crt_time: number = new Date(msg.Metadata?.timestamp).getTime();
  const putData: PutItemInput = {
    TableName: process.env.STAGING_MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.Metadata?.UUID },
      createdAt: { N: `${crt_time}` },
      content: { S: JSON.stringify(msg) },
    },
    ReturnConsumedCapacity: "TOTAL",
  };
  await dynamo.putItem(putData).promise();
};

在 putObject 后追加一次 putItem 🔱

将此作为附加集添加到保存辅助函数中,以便我们可以将其用作后续步骤。这将帮助我们理解上面演示的 I/O 时序信息。


const save = async (uploadParams: PutObjectRequest) => {
  let putResult: PromiseResult<PutObjectOutput, AWSError> | undefined =
    undefined;
  let putDBResult: PromiseResult<PutObjectOutput, AWSError> | undefined =
    undefined;
  try {
    putResult = await s3.putObject(uploadParams).promise();
    putDBResult = await dbPut(uploadParams);
  } catch (e) {
    console.log(e);
  } finally {
    console.log(putResult);
    console.log(putDBResult);
  }
  return putResult;
};

在 JSON 中创建附加属性 🚥

在这一步中,我们将在 JSON 中添加额外的数据,这将有助于我们从 dynamodb 中检索。

message.uuid = getUuid();
message.handler = context.awsRequestId;
message.key = `uploads/${message.uuid}.json`;
message.bucket = process.env.BucketName || "";
message.table = process.env.STAGING_MESSAGES_TABLE_NAME || "";
message.timestamp = new Date().toUTCString();

对 step-function 有效负载的更改🚀

step 函数的现有有效负载现在将使用更多属性数据进行增强,如下所示,以提取来自事件桥详细信息后 JSON 解析的附加消息。

const sfnTaskPayload = sfn.TaskInput.fromObject({
   MyTaskToken: sfn.JsonPath.taskToken,
   Record: {
     "messageId.$": "$.detail.message.uuid",
     "createdAt.$": "$.detail.message.timestamp",
     "bucket.$": "$.detail.message.bucket",
     "key.$": "$.detail.message.key",
     "table.$": "$.detail.message.table",
     "uuid.$": "$.detail.message.uuid",
     "timestamp.$": "$.detail.message.timestamp"
   },
});

sqs 有效载荷

重构消息记录器处理程序🎁

从消息记录器函数内部从上一篇文章中检索到 S3 的数据后,现在让我们添加新步骤以直接从 dynamo 获取,忽略 getObject,如下所示。

一旦s3Get调用完成,我们就使用了上面显示的跟踪中提到的dbGet

try {
  const crt_time: number = new Date(msg.timestamp).getTime();
  const getData: GetItemInput = {
    TableName: msg.table,
    Key: {
      messageId: { S: msg.uuid },
      createdAt: { N: `${crt_time}` },
    },
    ProjectionExpression: "messageId, createdAt, content",
    ConsistentRead: true,
    ReturnConsumedCapacity: "TOTAL",
  };
  const dbGet = await dynamo.getItem(getData).promise();
  data = JSON.parse(dbGet.Item.content.S).Body;

  //OLD code with S3
  if (data) {
    msg.event = data;
    const token = JSON.parse(Record.body).MyTaskToken;
    await dbPut(msg)
      .then(async (data: any) => {
        await funcSuccess(data, msg.messageId, token);
      })
      .catch(async (err: any) => {
        await funcFailure(err, msg.messageId, token);
      });
  }
} catch (err) {
  console.log(err);
}

parti-ql

因此,我们可以添加一个新步骤来记录到 dynamodb 并从 dynamo 中获取并执行我们的时序检查。

在下一篇文章中,我们将完全忽略此处未使用的直接 S3 操作并间接执行此操作。

我们将向我们的堆栈添加更多连接,并通过创建新结构使其在即将发布的文章中更有用,因此请考虑关注并订阅我的时事通讯。

⏭ 我们有下一篇关于无服务器的文章,请查看

https://devpost.hashnode.dev/aws-cdk-101-using-batched-dynamodb-stream-to-delete-item-on-another-dynamodb-table

🎉 感谢支持! 🙏

如果你愿意☕ 给我买杯咖啡,那会很棒,以帮助我努力。

! zoz100076](https://img.buymeacoffee.com/button-api/?text=Buy%20me%20a%20coffee&emoji=&slug=AravindVCyber&button_colour=BD5FFF&font_colour=ffffff&font_family=Cookie&outline_colour=000000&coffee_colour=FFDD00)

在 ko-fi.com 上给我买杯咖啡

🔁 原帖在🔗Dev Post

🔁 转发于🔗dev to @aravindvcyber

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐