每个人都知道当事情拖得太久时有多烦人!无论是似乎永远不会结束的细化会议,同事讲的一个没有切入主题的故事,还是您明确告诉您在某个时刻离开但不会离开您的 DynamoDB 表中的项目数据库!根据 AWS 文档,您可以在 dynamoDB 记录上设置生存时间 (TTL),然后 AWS 将在该时间后将其删除。但是,正如它在文档中所述:

因为 TTL 是一个后台进程,用于通过 TTL 过期和删除项目的容量的性质是可变的(但免费)。 TTL 通常会在过期后 48 小时内删除过期项目。

即使在实践中,项目通常会在 15 分钟内被删除,但不能保证它确实会被删除,这对你来说可能是个问题!

在本博客中,我们将讨论一个可扩展的设置,您可以使用该设置以您自己定义的精度使项目过期。

架构

为了更快地摆脱您的物品,我们可以使用下面的架构。计划的 EventBridge 规则用于每分钟触发一次 lambda。 lambda 查询 DynamoDB 哪些项目已过期,并将其删除。

image.png

在下面的部分中,我们将提供用于创建此基础结构的代码以及用于查询和删除项目的 lambda 代码。

数据模型

除了在 DynamoDB 中使用 TTL 字段外,我们还必须创建另一个字段,它不包含过期时间,而是包含过期窗口,其粒度您可以自己定义。expirationWindow定义了事件应该在什么时间窗口过期。您可以自由选择此窗口,但唯一的要求是它必须是触发 lambda 的频率的倍数。因此,如果您每 5 分钟触发一次 lambda,则时间窗口应该是 5、10、15 等......分钟长(更多内容见下文)。

为了获得一分钟的准确性,我们还使用了一分钟的expirationWindow粒度。稍后我们将查询此窗口以获取所有应过期的项目。

expirationWindow可以随心所欲地构建,但我选择了开始和结束日期时间的格式,用下划线分隔:StartISOTimestamp_EndISOTimestamp。例如,如果我们选择一分钟的粒度窗口,它可能像2022-07-19T21:27:00.000Z_2022-07-19T21:28:00.000Z

表中的示例项目现在如下所示:`

itemId(分区键)

ttl

到期窗口

其他属性

0001

1658266025

2022-07-19T21:27:00.000Z_2022-07-19T21:28:00.000Z

...

为了能够查询这个窗口,我们必须在表上创建一个所谓的全局二级索引。它基本上是您的表的副本,但具有不同的分区键和可选的排序键来支持我们需要的查询。

所以索引expirationWindowIndex的结构如下:

expireWindow(分区键)

ttl(排序键)

项目 ID

其他属性

2022-07-19T21:27:00.000Z_2022-07-19T21:28:00.000Z

1658266025

0001

...

这个索引允许我们查询某个expirationWindow内的所有项目(因为它是分区键),甚至可以过滤ttl属性(因为它是排序键)。

注意:在本博客中,我们使用 CloudWatch 事件来触发我们的 lambda 函数,该函数的最小粒度为 1 分钟,因此我们将过期窗口也保持为一分钟的粒度。

基础设施

要部署基础架构以支持此设置,您可以创建如下所示的 CDK 堆栈。它创建一个表(启用ttl并在expirationWindow上创建一个全局二级索引)、一个事件规则和一个 lambda。

import * as cdk from 'aws-cdk-lib';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambdaJs from 'aws-cdk-lib/aws-lambda-nodejs';
import { Construct } from 'constructs';

export class DeleteItemsStack extends cdk.Stack {
  public table: dynamodb.Table;

  constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    /**
     * The table itself
     */

    this.table = new dynamodb.Table(this, 'expirationTable', {
      partitionKey: { name: 'itemId', type: dynamodb.AttributeType.STRING },
      tableName: 'expirationTable',
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      timeToLiveAttribute: 'ttl',
    });

    /**
     * Deleting expired items: query and delete expired items every minute
     */
    const indexName = 'expirationWindowIndex';
    this.table.addGlobalSecondaryIndex({
      indexName: indexName,
      partitionKey: { name: 'expirationWindow', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'ttl', type: dynamodb.AttributeType.NUMBER },
    });

    const deleteExpiredItemsFunction = new lambdaJs.NodejsFunction(this, 'deleteExpiredItemsFunction', {
      entry: 'PATH_TO_LAMBDA_HERE',
      timeout: cdk.Duration.minutes(5), // To support a load of api calls if needed :P. And it's still faster than dynanomdb TTL.
      memorySize: 2048,
      environment: {
        TABLE_NAME: this.table.tableName,
        TABLE_INDEX_NAME: indexName,
      },
    });

    // Create an eventrule that goes off every minute, and transform the payload
    const eventRule = new events.Rule(this, 'scheduleRule', {
      schedule: events.Schedule.cron({ minute: '*/1' }),
    });
    eventRule.addTarget(new targets.LambdaFunction(deleteExpiredItemsFunction, {
      event: events.RuleTargetInput.fromObject({
        time: events.EventField.fromPath('$.time'),
      }),
    }));

    //allow it to query and delete items
    deleteExpiredItemsFunction.addToRolePolicy(
      new iam.PolicyStatement({
        actions: ['dynamodb:Query', 'dynamodb:BatchWriteItem'],
        resources: [
          this.table.tableArn,
          `${this.table.tableArn}/index/${indexName}`,
        ],
        effect: iam.Effect.ALLOW,
      }),
    );
  }
}

对于事件规则,请注意使用了自定义映射,因此调用 lambda 的事件有效负载仅包含一个键,该键具有应该创建事件的那一分钟的精确 0 秒时间戳。例如:

{
  time: "2022-07-19T20:55:00.000Z"
}

使用此代码,我们现在拥有删除项目所需的所有基础设施!

拉姆达

用于查询和删除项目的功能包括 3 个步骤:

1.处理EventBridge payload,检查应该查询什么expirationWindow

2.查询expirationWindow获取所有应该删除的项目

  1. 执行BatchWriteItem命令删除项目。
  • 将项目列表拆分为 25 个项目的块

  • 删除块

  • 重试以防任何项目处理失败最多 3 次

  • 重复直到处理完所有项目和块

可以在下面找到此代码:

/**
 * This lambda polls a table filled with items and checks if any of the items are expired.
 * If so, it deletes those entries from the table.
 */
import * as dynamo from '@aws-sdk/client-dynamodb';
import * as libDynamo from '@aws-sdk/lib-dynamodb';
import { Context } from 'aws-lambda';


const dynamoClient = new dynamo.DynamoDBClient({ region: 'eu-west-1' }); // You can change this to any region of course
const docClient = libDynamo.DynamoDBDocumentClient.from(dynamoClient);

interface ScheduledEvent {
  time: string;
}

/**
 * Construct a start- and end ISO timestamp, representing the partition key of the table's  Global Secondary Index (GSI)
 * @param date The date of the event to construct the expirationWindow for
 * @returns e.g. "2022-06-23T15:50:00.000Z_2022-06-23T15:51:00.000Z"
 */
export const getExpirationWindow = (date: Date, granularityInSeconds = 60) => {
  // a coefficient used to round to the nearest 5 minutes (the number of milliseconds in 5 minutes)
  const numberOfMsInWindow = 1000 * granularityInSeconds;
  // Subtract a second to make sure that floor and ceil give different results for a time falling exactly on a window's edge (12:00:00)
  const expirationDateMinus1 = new Date(date.getTime());
  expirationDateMinus1.setSeconds((expirationDateMinus1.getSeconds() - 1));

  const endWindow = new Date(Math.ceil(date.getTime() / numberOfMsInWindow) * numberOfMsInWindow).toISOString();
  const startWindow = new Date(Math.floor(expirationDateMinus1.getTime() / numberOfMsInWindow) * numberOfMsInWindow).toISOString();
  return startWindow + '_' + endWindow;
};

/**
 * Queries 1 partition of the Global Secondary Index (GSI) of the table,
 * and filters on items that have been expired by using the sort key
 * @param expirationWindow Primary key of the GSI of the table, containing all items expiring in this N minute window
 * @param expirationTime Epoch timestamp on when the items expires
 * @returns A list of item IDs to delete
 */
const queryExpiredItems = async (
  expirationWindow: string,
  expirationTime: number,
  tableName: string,
  tableIndex: string
) => {
  const command = new libDynamo.QueryCommand({
    IndexName: tableIndex,
    TableName: tableName,
    KeyConditionExpression: '#expirationWindow = :expirationWindow AND #expirationTime < :expirationTime',
    ExpressionAttributeNames: {
      '#expirationWindow': 'expirationWindow',
      '#expirationTime': 'ttl',
    },
    ExpressionAttributeValues: {
      ':expirationWindow': expirationWindow,
      ':expirationTime': expirationTime,
    },
  });
  const response = await docClient.send(command);
  return response?.Items?.map(item => (item.itemId));
};

/**
 * Create the parameters for the BatchWrite request to delete a batch of items
 * @param itemIds List of item id's to be deleted
 */
const createDeleteRequestParams = (itemIds: string[], tableName: string) => {
  const deleteRequests: { [key: string]: any }[] = itemIds.map((itemId) => (
    {
      DeleteRequest: {
        Key: {
          itemId,
        },
      },
    }));
  const paramsDelete: libDynamo.BatchWriteCommandInput = {
    RequestItems: {},
  };
  paramsDelete.RequestItems![tableName] = deleteRequests;
  return paramsDelete;
};

/**
 * Deletes a batch (max 25) of items from the table
 * @param itemIds List of item Ids to delete from the table
 * @returns unprocessed items and whether or not errors have occurrred.
 */
const deleteBatch = async (itemIds: string[], tableName: string) => {
  if (itemIds.length > 25) {
    throw new Error(`BatchWrite item supports only up to 25 items. Got ${itemIds.length}`);
  }
  let unprocessedItems: string[] = [];
  const requestParams = createDeleteRequestParams(itemIds, tableName);
  try {
    const data = await docClient.send(new libDynamo.BatchWriteCommand(requestParams));
    // Check if any unprocessed data object is present, and it is populated with at least one item for this table
    if (data.UnprocessedItems && data.UnprocessedItems[tableName]) {
      unprocessedItems = data.UnprocessedItems[tableName].map(
        (request) => (request.DeleteRequest?.Key!).itemId,
      );
    }
    // At least a partial success, maybe with unprocessed items
    return {
      unprocessed: unprocessedItems,
      hasErrors: false,
    };
  } catch (err) {
    // Error in deleting items. Retrying...=
    return {
      unprocessed: itemIds,
      hasErrors: true,
    };
  }
};

/**
 * Delete a set of items from the table.
 * @param items List of itemIds of expired items
 * @param retriesLeft when errors occur, unprocessed items are retried N times.
 */
const deleteExpiredItems = async (items: string[], tableName: string, retriesLeft = 3) => {
  let unprocessedItems: string[] = [];
  if (retriesLeft <= 0) {
    throw new Error(`unable to delete the desired ${items.length} items`);
  }

  // Delete in chunks of 25 items
  const chunkSize = 25;
  for (let i = 0; i < items.length; i += chunkSize) {
    const batchOfItems = items.slice(i, i + chunkSize);
    const response = await deleteBatch(batchOfItems, tableName);
    unprocessedItems = unprocessedItems.concat(response.unprocessed);
  }
  // Retry one more time if unprocessed items happen
  if (unprocessedItems.length > 0) {
    await deleteExpiredItems(unprocessedItems, tableName, retriesLeft - 1);
  }
};

/**
 *
 * @param event A scheduled event from EventBridge with custom payload, providing the exact time of the cron-event.
 * @param _context lambda context object
 * @returns 'done'
 */
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export const lambdaHandler = async (event: ScheduledEvent, _context: Context) => {
  if (!process.env.TABLE_NAME || !process.env.TABLE_INDEX_NAME) {
    throw new Error('TABLE_NAME and TABLE_INDEX_NAME should be defined but are not');
  }
  const tableName = process.env.TABLE_NAME!;
  const tableIndex = process.env.TABLE_INDEX_NAME!;

  // Step 1: read the incoming event payload from eventbridge and process to 
  // an expirationWindow and timestamp to query the datatore.
  const eventDatetime = new Date(event.time);
  const expirationWindow = getExpirationWindow(eventDatetime);
  const expirationTimestamp = Math.round(eventDatetime.getTime() / 1000);
  // Step 2: Query the expired items
  const items = await queryExpiredItems(expirationWindow, expirationTimestamp, tableName, tableIndex);
  if (items) {
    // Step 3: Delete the expired items
    await deleteExpiredItems(items, tableName);
  }
};

选择过期窗口

通过上述设置,到期窗口需要是触发 lambda 频率的倍数。要了解原因,让我们考虑以下示例:

每两分钟触发一次,窗口为 5 分钟(不是倍数)。这意味着在第 4 分钟从 0-5 分钟索引中查询项目,仅删除 0-4 分钟。在第 6 分钟的下一次触发时,查询 5-10 分钟索引中的事件,仅删除 5-6 分钟。第 4 分钟到第 5 分钟之间的事件将不予处理!

幸运的是,原生 TTL 功能会在某个时候启动,因此这些项目最终会被删除。您可以自己将逻辑添加到 lambda 以查询以前的索引,但我选择不这样做以使事情更简单。

此设置的成本影响

向表中添加全局二级索引会显着增加写入繁重数据库的成本,因为您不仅要为写入表的记录付费,还要为受写入影响的任何索引付费。这有效地使您的写入成本加倍(假设您没有配置其他 GSI)。

此外,TTL 是免费的,而主动删除项目也算作一次写入,因此会产生额外费用。

进一步提高精度

在所描述的设置中,计划事件用于触发删除功能。由于这些只有分钟粒度,我们也只能每分钟查询一次。为了改进这一点,请编写您自己的调度逻辑以支持例如二级粒度。

结论

使用提供的设置,您可以比 DynamoDb 的原生 TTL 功能更准确地查询和删除项目。 TTL 仍然可以使用,但另外我们可以主动查询和删除那些过期时间过长的项目。

Logo

云原生社区为您提供最前沿的新闻资讯和知识内容

更多推荐