🚁 AWS CDK 101 - 🐬 通过 stepfunction 从 Dynamodb 与 S3 获取 JSON
🔰 刚接触 AWS CDK 的初学者,请务必将我在本系列中的前几篇文章一一看一下。
如果万一错过了我之前的文章,请通过以下链接找到它。
🔁 🔗Dev Post上的原帖
https://devpost.hashnode.dev/aws-cdk-101-fetching-json-from-s3-through-stepfunction
🔁 将🔗dev 上的帖子转发到@aravindvcyber
https://dev.to/aravindvcyber/aws-cdk-101-fetching-json-from-s3-through-stepfunction-15ob
在本文中,让我们重构我们之前使用基于 S3 的存储来检索 JSON 数据以处理成更高性能的基于 dynamodb 的步骤函数。与另一个相比,两者都有其优势,因此我们必须根据预期的周转时间明智地选择它们。
用 S3 可视化这个场景⛵️
用 dynamodb 可视化这个场景🚀
从 dynamodb 与 S3 获取 JSON 的好处❄️
-
简而言之,使用 dynamodb 代替 S3 的第一个好处是减少了 I/O 操作所需的时间。
-
S3 比类似的 dynamodb 操作需要更多的时间。
-
虽然 dynamodb 与 s3 存储相比成本更高,但当我们使用良好的存档设置时,这可以成为我们的优势。
-
进一步的 S3 操作是一个外部请求,它具有 DNS 查找和 httpAgent 创建等开销。
-
虽然 dynamodb 被视为数据库事务并且速度更快。
-
此外,您可以将 S3 元数据移动到 dynamodb 中的字段中,以便于操作。
-
这也将有助于有机会在每个处理步骤中仅查询和提取所需字段,而无需像 S3 中那样下载完整的对象内容。
-
当我们有更多的前缀时,S3 也会有更好的吞吐量,这可以改变密钥路径。而在 dynamodb 中,我们通过使用预配置和按请求付费模型来很好地利用吞吐量。
-
我们还可以根据请求微调我们的读取和写入容量,这些请求可能会受到限制,以实现高效的成本管理和可靠性,并完全控制预留。
-
顺便说一下,我们为什么要减少 I/O 时间,这是因为我们总是从计算资源触发这个,该计算资源是按容量和运行时间计量的,它将等待更长的空闲时间。
-
数据是 dynamodb,我们可以像 S3 一样高效地归档各种服务集成,除了我们与 dynamodb 具有的特殊 TTL 属性,可以用来删除像我们案例中的临时对象。
分析的最终结果🏀
我尝试使用 X 射线和新遗迹追踪这一点,并找到以下观察结果。
并且请理解可能存在一些差异,因为来自 X 射线和新遗物的痕迹都以不同的方式跟踪时间。
然而,这里要注意的一点是,当我们有高吞吐量事务而没有对资源规范进行任何微调时,两者都表明 S3 可能比 dynamodb 慢一些。
putObject 的 xray 的一些跟踪结果🏭
来自 newrelic 的 putItem 的深入追踪🎠
getObject 的 xray 的一些跟踪结果⛲️
从 newrelic 中深入追踪 getItem ⛵️
施工⛺️
在这里,让我们首先在主堆栈中创建一个新表,如下所示。
const stgMessages = new dynamodb.Table(this, "stgMessagesTable", {
tableName: process.env.stgMessagesTable,
sortKey: { name: "createdAt", type: dynamodb.AttributeType.NUMBER },
partitionKey: { name: "messageId", type: dynamodb.AttributeType.STRING },
encryption: dynamodb.TableEncryption.AWS_MANAGED,
readCapacity: 5,
writeCapacity: 5,
});
并授予对入口处理程序和后端 step-function 间接调用处理程序的访问权限。
stgMessages.grantWriteData(eventCounterBus.handler);
stgMessages.grantReadData(messageRecorder);
重构消息入口处理程序🎪
以前用于保存到 S3 的现有消息条目处理程序现在将得到增强,如下所示。
辅助函数 putItem
const dbPut: any = async (msg: any) => {
const dynamo = new DynamoDB();
const crt_time: number = new Date(msg.Metadata?.timestamp).getTime();
const putData: PutItemInput = {
TableName: process.env.STAGING_MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.Metadata?.UUID },
createdAt: { N: `${crt_time}` },
content: { S: JSON.stringify(msg) },
},
ReturnConsumedCapacity: "TOTAL",
};
await dynamo.putItem(putData).promise();
};
在 putObject 后追加一次 putItem 🔱
将此作为附加集添加到保存辅助函数中,以便我们可以将其用作后续步骤。这将帮助我们理解上面演示的 I/O 时序信息。
const save = async (uploadParams: PutObjectRequest) => {
let putResult: PromiseResult<PutObjectOutput, AWSError> | undefined =
undefined;
let putDBResult: PromiseResult<PutObjectOutput, AWSError> | undefined =
undefined;
try {
putResult = await s3.putObject(uploadParams).promise();
putDBResult = await dbPut(uploadParams);
} catch (e) {
console.log(e);
} finally {
console.log(putResult);
console.log(putDBResult);
}
return putResult;
};
在 JSON 中创建附加属性 🚥
在这一步中,我们将在 JSON 中添加额外的数据,这将有助于我们从 dynamodb 中检索。
message.uuid = getUuid();
message.handler = context.awsRequestId;
message.key = `uploads/${message.uuid}.json`;
message.bucket = process.env.BucketName || "";
message.table = process.env.STAGING_MESSAGES_TABLE_NAME || "";
message.timestamp = new Date().toUTCString();
对 step-function 有效负载的更改🚀
step 函数的现有有效负载现在将使用更多属性数据进行增强,如下所示,以提取来自事件桥详细信息后 JSON 解析的附加消息。
const sfnTaskPayload = sfn.TaskInput.fromObject({
MyTaskToken: sfn.JsonPath.taskToken,
Record: {
"messageId.$": "$.detail.message.uuid",
"createdAt.$": "$.detail.message.timestamp",
"bucket.$": "$.detail.message.bucket",
"key.$": "$.detail.message.key",
"table.$": "$.detail.message.table",
"uuid.$": "$.detail.message.uuid",
"timestamp.$": "$.detail.message.timestamp"
},
});
重构消息记录器处理程序🎁
从消息记录器函数内部从上一篇文章中检索到 S3 的数据后,现在让我们添加新步骤以直接从 dynamo 获取,忽略 getObject,如下所示。
一旦s3Get
调用完成,我们就使用了上面显示的跟踪中提到的dbGet
。
try {
const crt_time: number = new Date(msg.timestamp).getTime();
const getData: GetItemInput = {
TableName: msg.table,
Key: {
messageId: { S: msg.uuid },
createdAt: { N: `${crt_time}` },
},
ProjectionExpression: "messageId, createdAt, content",
ConsistentRead: true,
ReturnConsumedCapacity: "TOTAL",
};
const dbGet = await dynamo.getItem(getData).promise();
data = JSON.parse(dbGet.Item.content.S).Body;
//OLD code with S3
if (data) {
msg.event = data;
const token = JSON.parse(Record.body).MyTaskToken;
await dbPut(msg)
.then(async (data: any) => {
await funcSuccess(data, msg.messageId, token);
})
.catch(async (err: any) => {
await funcFailure(err, msg.messageId, token);
});
}
} catch (err) {
console.log(err);
}
因此,我们可以添加一个新步骤来记录到 dynamodb 并从 dynamo 中获取并执行我们的时序检查。
在下一篇文章中,我们将完全忽略此处未使用的直接 S3 操作并间接执行此操作。
我们将向我们的堆栈添加更多连接,并通过创建新结构使其在即将发布的文章中更有用,因此请考虑关注并订阅我的时事通讯。
⏭ 我们有下一篇关于无服务器的文章,请查看
https://devpost.hashnode.dev/aws-cdk-101-using-batched-dynamodb-stream-to-delete-item-on-another-dynamodb-table
🎉 感谢支持! 🙏
如果你愿意☕ 给我买杯咖啡,那会很棒,以帮助我努力。
! zoz100076](https://img.buymeacoffee.com/button-api/?text=Buy%20me%20a%20coffee&emoji=&slug=AravindVCyber&button_colour=BD5FFF&font_colour=ffffff&font_family=Cookie&outline_colour=000000&coffee_colour=FFDD00)
🔁 原帖在🔗Dev Post
🔁 转发于🔗dev to @aravindvcyber
更多推荐
所有评论(0)