DynamoDB - Incremental Export#

这个 POC 是用来验证 DynamoDB 在 2023-09 推出的 Incremental Export 功能的.

下面这个 gen_data.py 脚本能够每一秒生成一条数据, 并且写入到 DynamoDB 中.

gen_data.py
 1# -*- coding: utf-8 -*-
 2
 3import uuid
 4import time
 5import random
 6from datetime import datetime, timezone
 7
 8import pynamodb_mate.api as pm
 9from boto_session_manager import BotoSesManager
10
11
12def get_utc_now() -> datetime:
13    return datetime.utcnow().replace(tzinfo=timezone.utc)
14
15
16tb_name = "incremental_export_poc-measurement"
17
18
19class Measurement(pm.Model):
20    class Meta:
21        table_name = tb_name
22        region = "us-east-1"
23        billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
24
25    id = pm.UnicodeAttribute(hash_key=True, default=lambda: str(uuid.uuid4()))
26    time = pm.UTCDateTimeAttribute(null=False, default=get_utc_now)
27    value = pm.NumberAttribute(null=False)
28
29
30Measurement.create_table(wait=True)
31
32aws_profile = "bmt_app_dev_us_east_1"
33bsm = BotoSesManager(profile_name=aws_profile)
34
35
36def turn_on_pitr():
37    bsm.dynamodb_client.update_continuous_backups(
38        TableName=tb_name,
39        PointInTimeRecoverySpecification={
40            "PointInTimeRecoveryEnabled": True,
41        },
42    )
43
44
45def gen_data():
46    while 1:
47        time.sleep(random.randint(500, 1500) / 1000)
48        Measurement(
49            value=random.randint(1, 100),
50        ).save()
51
52
53if __name__ == "__main__":
54    turn_on_pitr()
55    gen_data()

这个 export_data.py 脚本则分别导出了 initial load 和 incremental load, 并且读取了里面的数据.

export_data.py
  1# -*- coding: utf-8 -*-
  2
  3import gzip
  4import polars as pl
  5from datetime import datetime, timezone, timedelta
  6
  7from s3pathlib import S3Path
  8from aws_dynamodb_io.api import ExportJob, ExportFormatEnum, ExportTypeEnum
  9from simpletype.api import String, Integer
 10from dynamodb_json_seder.api import deserialize_df
 11
 12from gen_data import (
 13    Measurement,
 14    tb_name,
 15    bsm,
 16)
 17
 18
 19bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
 20s3dir_root = S3Path(
 21    f"s3://{bucket}/projects/learn_aws/dynamodb-solutions/incremental-export/"
 22).to_dir()
 23table_arn = f"arn:aws:dynamodb:{bsm.aws_region}:{bsm.aws_account_id}:table/{tb_name}"
 24simple_schema = {
 25    "id": String(),
 26    "time": String(),
 27    "value": Integer(),
 28}
 29polars_schema = {k: v.to_dynamodb_json_polars() for k, v in simple_schema.items()}
 30
 31
 32def export_initial_data():
 33    export_job = ExportJob.export_table_to_point_in_time(
 34        dynamodb_client=bsm.dynamodb_client,
 35        table_arn=table_arn,
 36        s3_bucket=s3dir_root.bucket,
 37        s3_prefix=s3dir_root.key,
 38        export_time=datetime(2024, 9, 14, 23, 5, 0).astimezone(timezone.utc),
 39        export_format=ExportFormatEnum.DYNAMODB_JSON.value,
 40    )
 41    print(f"export_arn = {export_job.arn}")
 42
 43
 44def export_incremental_data():
 45    export_job = ExportJob.export_table_to_point_in_time(
 46        dynamodb_client=bsm.dynamodb_client,
 47        table_arn=table_arn,
 48        s3_bucket=s3dir_root.bucket,
 49        s3_prefix=s3dir_root.key,
 50        export_format=ExportFormatEnum.DYNAMODB_JSON.value,
 51        export_type=ExportTypeEnum.INCREMENTAL_EXPORT.value,
 52        incremental_export_specification=dict(
 53            ExportFromTime=datetime(2024, 9, 14, 23, 5, 0).astimezone(timezone.utc),
 54            ExportToTime=datetime(2024, 9, 14, 23, 20, 0).astimezone(timezone.utc),
 55        ),
 56    )
 57    print(f"export_arn = {export_job.arn}")
 58
 59
 60def read_df_from_init_export(export_job: ExportJob):
 61    data_file_list = export_job.get_data_files(bsm.dynamodb_client, bsm.s3_client)
 62    sub_df_list = list()
 63    for data_file in data_file_list:
 64        s3path = S3Path(f"s3://{bucket}/{data_file.s3_key}")
 65        b = s3path.read_bytes(bsm=bsm)
 66        sub_df = pl.read_ndjson(
 67            gzip.decompress(b),
 68            schema={"Item": pl.Struct(polars_schema)},
 69        )
 70        sub_df_list.append(sub_df)
 71    df = pl.concat(sub_df_list)
 72    df = deserialize_df(df, simple_schema, dynamodb_json_col="Item")
 73    return df
 74
 75
 76def read_df_from_incr_export(export_job: ExportJob):
 77    data_file_list = export_job.get_data_files(bsm.dynamodb_client, bsm.s3_client)
 78    sub_df_list = list()
 79    for data_file in data_file_list:
 80        s3path = S3Path(f"s3://{bucket}/{data_file.s3_key}")
 81        b = s3path.read_bytes(bsm=bsm)
 82        sub_df = pl.read_ndjson(
 83            gzip.decompress(b),
 84            schema={"NewImage": pl.Struct(polars_schema)},
 85        )
 86        sub_df_list.append(sub_df)
 87    df = pl.concat(sub_df_list)
 88    df = deserialize_df(df, simple_schema, dynamodb_json_col="NewImage")
 89    return df
 90
 91
 92def exam_overlap(init_arn: str, incr_arn: str):
 93    init_export = ExportJob.describe_export(bsm.dynamodb_client, init_arn)
 94    incr_export = ExportJob.describe_export(bsm.dynamodb_client, incr_arn)
 95    df_init= read_df_from_init_export(init_export)
 96    df_incr = read_df_from_incr_export(incr_export)
 97    df_init = df_init.sort("time")
 98    df_incr = df_incr.sort("time")
 99    print(df_init.tail(1).to_dicts())
100    print(df_incr.head(1).to_dicts())
101
102
103
104if __name__ == "__main__":
105    # export_initial_data()
106    init_arn = "arn:aws:dynamodb:us-east-1:878625312159:table/incremental_export_poc-measurement/export/01726369892818-7b359b68"
107    # export_incremental_data()
108    incr_arn = "arn:aws:dynamodb:us-east-1:878625312159:table/incremental_export_poc-measurement/export/01726370424000-3dcd3992"
109    exam_overlap(init_arn, incr_arn)

重要结论

  • full export 的时候, export time 是结尾时间, 是 exclusive 的 (不包含 export time 本身)

  • incremental export 的时候, start time 是包括本身的, 而 end time 不包括.

  • full export 的数据是在 Item field 下的.

  • incremental export 的数据是在 NewImage field 下的.

  • incremental export 的 window 必须在 15 分钟以上.

  • incremental export 会在 S3 prefix 下直接创建一个 data 的目录来保存数据 (而 full export 会根据时间戳自动创建一个子文件夹). 所以建议 incremental export 的 prefix 的 folder name 包含 export period 的时间戳, 这样能比较确保不同的 incremental export 不会互相影响.

  • 哪怕是很小的数据量, 一般 export 的时间也在 5 分钟左右.

由以上结论可以得出, 现在要想将 DynamoDB 同步到数据仓库中, 不用 fancy 的流数据处理, 就能实现不超过 20 - 30 分钟数据延迟的同步. 对于大多数应用来说这已经够了. 这个 20 分钟是建立在假设你有一个 8:00 的数据, 你只有在 8:15 的时候才能开始运行一个 15 分钟窗口的 incremental export, 而 export 本身需要 5 分钟, 数据处理需要大约 1 分钟, 所以总共的延迟是 20 分钟. 当然你也可以在 8:05 的时候就做一个 7:50 - 8:05 的 export, 然后数据处理时 filter 掉不要的数据, 这样的延迟可以做到 10 分钟左右. 但无论怎么样, 数据延迟都不会低于 export 本身需要的时间 (大约 5 分钟).

DynamoDB to Data Lake Solution

我的这套方案不需要任何 Orchestration, 只需要 Lambda Function 既可.

  1. Initial Export Lambda:
    • Description: 这个 Lambda 的任务是负责打开 PITR, 然后适时启动 Full Export Job.

    • Detail: 它会不断检测目标 Table 是否打开了 PITR, 如果打开了, 就会在 15 分钟整点后启动一个 Export Job, 并且在 S3 中写一个 Tracker 文件, 表示时间已经推进到了这个 Initial Load Export Time. 这样后续的 Incremental Export Lambda 看到这个 S3 文件就知道可以开始进行 Incremental Export 了.

    • Schedule: 15 分钟运行一次.

  2. Initial Export Data Processing Lambda:
    • Description: 这个 Lambda 的任务是负责处理 Full Export 的数据, 并且写入到 Data Lake 中.

    • Detail: 这个 Lambda 会 5 分钟运行一次, 检查 Full Export 完成没有, 如果完成了就会读取 Full Export 的数据, 然后写入到 Data Lake 中. 这个 Lambda 会根据 Full Export 的时间戳来决定写入到 Data Lake 的目录结构.

    • Schedule: 15 分钟运行一次.

  3. Incremental Export Lambda: 这个
    • Schedule: 5 分钟运行一次.

  4. Incremental Export Data Processing Lambda.

  1. 你先开启 PITR. 然后等个 15 分钟, 然后找之前最近的一个 15 分钟的节点, 进行一次 Full Export. 例如你 7:55 打开的 PITR, 然后你 8:00 的时候进行一次把 8:00 之前的全部数据导出的 Full Export, 然后用本地运行的程序运行个一次

你需要两个定时运行的 Lambda