DynamoDB - Incremental Export#
这个 POC 是用来验证 DynamoDB 在 2023-09 推出的 Incremental Export 功能的.
下面这个 gen_data.py
脚本能够每一秒生成一条数据, 并且写入到 DynamoDB 中.
gen_data.py
1# -*- coding: utf-8 -*-
2
3import uuid
4import time
5import random
6from datetime import datetime, timezone
7
8import pynamodb_mate.api as pm
9from boto_session_manager import BotoSesManager
10
11
12def get_utc_now() -> datetime:
13 return datetime.utcnow().replace(tzinfo=timezone.utc)
14
15
16tb_name = "incremental_export_poc-measurement"
17
18
19class Measurement(pm.Model):
20 class Meta:
21 table_name = tb_name
22 region = "us-east-1"
23 billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
24
25 id = pm.UnicodeAttribute(hash_key=True, default=lambda: str(uuid.uuid4()))
26 time = pm.UTCDateTimeAttribute(null=False, default=get_utc_now)
27 value = pm.NumberAttribute(null=False)
28
29
30Measurement.create_table(wait=True)
31
32aws_profile = "bmt_app_dev_us_east_1"
33bsm = BotoSesManager(profile_name=aws_profile)
34
35
36def turn_on_pitr():
37 bsm.dynamodb_client.update_continuous_backups(
38 TableName=tb_name,
39 PointInTimeRecoverySpecification={
40 "PointInTimeRecoveryEnabled": True,
41 },
42 )
43
44
45def gen_data():
46 while 1:
47 time.sleep(random.randint(500, 1500) / 1000)
48 Measurement(
49 value=random.randint(1, 100),
50 ).save()
51
52
53if __name__ == "__main__":
54 turn_on_pitr()
55 gen_data()
这个 export_data.py
脚本则分别导出了 initial load 和 incremental load, 并且读取了里面的数据.
export_data.py
1# -*- coding: utf-8 -*-
2
3import gzip
4import polars as pl
5from datetime import datetime, timezone, timedelta
6
7from s3pathlib import S3Path
8from aws_dynamodb_io.api import ExportJob, ExportFormatEnum, ExportTypeEnum
9from simpletype.api import String, Integer
10from dynamodb_json_seder.api import deserialize_df
11
12from gen_data import (
13 Measurement,
14 tb_name,
15 bsm,
16)
17
18
19bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
20s3dir_root = S3Path(
21 f"s3://{bucket}/projects/learn_aws/dynamodb-solutions/incremental-export/"
22).to_dir()
23table_arn = f"arn:aws:dynamodb:{bsm.aws_region}:{bsm.aws_account_id}:table/{tb_name}"
24simple_schema = {
25 "id": String(),
26 "time": String(),
27 "value": Integer(),
28}
29polars_schema = {k: v.to_dynamodb_json_polars() for k, v in simple_schema.items()}
30
31
32def export_initial_data():
33 export_job = ExportJob.export_table_to_point_in_time(
34 dynamodb_client=bsm.dynamodb_client,
35 table_arn=table_arn,
36 s3_bucket=s3dir_root.bucket,
37 s3_prefix=s3dir_root.key,
38 export_time=datetime(2024, 9, 14, 23, 5, 0).astimezone(timezone.utc),
39 export_format=ExportFormatEnum.DYNAMODB_JSON.value,
40 )
41 print(f"export_arn = {export_job.arn}")
42
43
44def export_incremental_data():
45 export_job = ExportJob.export_table_to_point_in_time(
46 dynamodb_client=bsm.dynamodb_client,
47 table_arn=table_arn,
48 s3_bucket=s3dir_root.bucket,
49 s3_prefix=s3dir_root.key,
50 export_format=ExportFormatEnum.DYNAMODB_JSON.value,
51 export_type=ExportTypeEnum.INCREMENTAL_EXPORT.value,
52 incremental_export_specification=dict(
53 ExportFromTime=datetime(2024, 9, 14, 23, 5, 0).astimezone(timezone.utc),
54 ExportToTime=datetime(2024, 9, 14, 23, 20, 0).astimezone(timezone.utc),
55 ),
56 )
57 print(f"export_arn = {export_job.arn}")
58
59
60def read_df_from_init_export(export_job: ExportJob):
61 data_file_list = export_job.get_data_files(bsm.dynamodb_client, bsm.s3_client)
62 sub_df_list = list()
63 for data_file in data_file_list:
64 s3path = S3Path(f"s3://{bucket}/{data_file.s3_key}")
65 b = s3path.read_bytes(bsm=bsm)
66 sub_df = pl.read_ndjson(
67 gzip.decompress(b),
68 schema={"Item": pl.Struct(polars_schema)},
69 )
70 sub_df_list.append(sub_df)
71 df = pl.concat(sub_df_list)
72 df = deserialize_df(df, simple_schema, dynamodb_json_col="Item")
73 return df
74
75
76def read_df_from_incr_export(export_job: ExportJob):
77 data_file_list = export_job.get_data_files(bsm.dynamodb_client, bsm.s3_client)
78 sub_df_list = list()
79 for data_file in data_file_list:
80 s3path = S3Path(f"s3://{bucket}/{data_file.s3_key}")
81 b = s3path.read_bytes(bsm=bsm)
82 sub_df = pl.read_ndjson(
83 gzip.decompress(b),
84 schema={"NewImage": pl.Struct(polars_schema)},
85 )
86 sub_df_list.append(sub_df)
87 df = pl.concat(sub_df_list)
88 df = deserialize_df(df, simple_schema, dynamodb_json_col="NewImage")
89 return df
90
91
92def exam_overlap(init_arn: str, incr_arn: str):
93 init_export = ExportJob.describe_export(bsm.dynamodb_client, init_arn)
94 incr_export = ExportJob.describe_export(bsm.dynamodb_client, incr_arn)
95 df_init= read_df_from_init_export(init_export)
96 df_incr = read_df_from_incr_export(incr_export)
97 df_init = df_init.sort("time")
98 df_incr = df_incr.sort("time")
99 print(df_init.tail(1).to_dicts())
100 print(df_incr.head(1).to_dicts())
101
102
103
104if __name__ == "__main__":
105 # export_initial_data()
106 init_arn = "arn:aws:dynamodb:us-east-1:878625312159:table/incremental_export_poc-measurement/export/01726369892818-7b359b68"
107 # export_incremental_data()
108 incr_arn = "arn:aws:dynamodb:us-east-1:878625312159:table/incremental_export_poc-measurement/export/01726370424000-3dcd3992"
109 exam_overlap(init_arn, incr_arn)
重要结论
full export 的时候, export time 是结尾时间, 是 exclusive 的 (不包含 export time 本身)
incremental export 的时候, start time 是包括本身的, 而 end time 不包括.
full export 的数据是在 Item field 下的.
incremental export 的数据是在 NewImage field 下的.
incremental export 的 window 必须在 15 分钟以上.
incremental export 会在 S3 prefix 下直接创建一个 data 的目录来保存数据 (而 full export 会根据时间戳自动创建一个子文件夹). 所以建议 incremental export 的 prefix 的 folder name 包含 export period 的时间戳, 这样能比较确保不同的 incremental export 不会互相影响.
哪怕是很小的数据量, 一般 export 的时间也在 5 分钟左右.
由以上结论可以得出, 现在要想将 DynamoDB 同步到数据仓库中, 不用 fancy 的流数据处理, 就能实现不超过 20 - 30 分钟数据延迟的同步. 对于大多数应用来说这已经够了. 这个 20 分钟是建立在假设你有一个 8:00 的数据, 你只有在 8:15 的时候才能开始运行一个 15 分钟窗口的 incremental export, 而 export 本身需要 5 分钟, 数据处理需要大约 1 分钟, 所以总共的延迟是 20 分钟. 当然你也可以在 8:05 的时候就做一个 7:50 - 8:05 的 export, 然后数据处理时 filter 掉不要的数据, 这样的延迟可以做到 10 分钟左右. 但无论怎么样, 数据延迟都不会低于 export 本身需要的时间 (大约 5 分钟).
DynamoDB to Data Lake Solution
我的这套方案不需要任何 Orchestration, 只需要 Lambda Function 既可.
- Initial Export Lambda:
Description: 这个 Lambda 的任务是负责打开 PITR, 然后适时启动 Full Export Job.
Detail: 它会不断检测目标 Table 是否打开了 PITR, 如果打开了, 就会在 15 分钟整点后启动一个 Export Job, 并且在 S3 中写一个 Tracker 文件, 表示时间已经推进到了这个 Initial Load Export Time. 这样后续的 Incremental Export Lambda 看到这个 S3 文件就知道可以开始进行 Incremental Export 了.
Schedule: 15 分钟运行一次.
- Initial Export Data Processing Lambda:
Description: 这个 Lambda 的任务是负责处理 Full Export 的数据, 并且写入到 Data Lake 中.
Detail: 这个 Lambda 会 5 分钟运行一次, 检查 Full Export 完成没有, 如果完成了就会读取 Full Export 的数据, 然后写入到 Data Lake 中. 这个 Lambda 会根据 Full Export 的时间戳来决定写入到 Data Lake 的目录结构.
Schedule: 15 分钟运行一次.
- Incremental Export Lambda: 这个
Schedule: 5 分钟运行一次.
Incremental Export Data Processing Lambda.
你先开启 PITR. 然后等个 15 分钟, 然后找之前最近的一个 15 分钟的节点, 进行一次 Full Export. 例如你 7:55 打开的 PITR, 然后你 8:00 的时候进行一次把 8:00 之前的全部数据导出的 Full Export, 然后用本地运行的程序运行个一次
你需要两个定时运行的 Lambda