Storage Large Item in DynamoDB#
Keywords: AWS, Amazon, DynamoDB
很多时候我们会有用 DynamoDB 来管理大型 binary 数据的需求. 例如文档管理系统, Artifacts 管理系统. 而 DynamoDB 有单个 Item 400KB 的限制. 这种情况下, 根据 AWS 官方文档中的 Best practices for storing large items and attributes 一文, 你应该将数据存在 S3 上而将 S3 URI 存在 DynamoDB 上. 这种方法听起来简单, 但是在生产实践中有很多细节问题值得商榷. 例如:
写入到 DynamoDB 和 S3 的操作如果有一个失败了怎么处理?
如果 DynamoDB 有多个属性都是 Large Binary, 这必然导致写入 S3 的耗时会比较长, 如何保证它们全部成功或者全部失败?
写入到 S3 的时候应该怎么构建 S3 URI?
在 Update 的时候是否要删除旧的 S3 Object? 长期下来产生的很多没有用的 S3 Object 怎么清理?
本文将详细的讨论这种将数据写入到 S3, 将 S3 URI 存到 DynamoDB 中的正确做法.
Conclusion#
双写一致性: Create / Update 时先写 S3, 再写 DynamoDB, Delete 时先删 DynamoDB 再删 S3.
S3 Location: 使用
s3://bucket/prefix/${partition_key}/${optional_sort_key}/${attribute_name}/${hash_of_the_data}
的模式.Clean Up: 每个 S3 object 都有一个 update at 的 metadata, 这和 DynamoDB item 的时间一致. 所以我们可以用 DynamoDB export 到 S3 (该操作比较便宜, 并不消耗 RCU, 它是用 bin log 实现的. 请看这篇 DynamoDB data export to Amazon S3: how it works 官方文档), 然后用一个 batch 程序去对比 DynamoDB 和 S3 既可. 由于 S3 update 的时间可能比真实的 DynamoDB update 时间要早一点 (取决于写入 S3 的耗时), 所以我们可以把时间回溯个 1 小时, 只对比在这之前的数据既可.
Diagram#
Sample Script#
store_large_item_in_dynamodb.py
1# -*- coding: utf-8 -*-
2
3import hashlib
4from datetime import datetime, timezone
5
6import moto
7import pynamodb_mate as pm
8from s3pathlib import S3Path, context
9from boto_session_manager import BotoSesManager
10
11
12def get_md5(b: bytes) -> str:
13 return hashlib.md5(b).hexdigest()
14
15
16def get_utc_now() -> datetime:
17 return datetime.utcnow().replace(tzinfo=timezone.utc)
18
19
20def get_s3_key(
21 pk,
22 sk,
23 attr: str,
24 value: bytes,
25 prefix: str = "",
26) -> str:
27 s3path = S3Path(f"s3://my-bucket").joinpath(prefix).joinpath(f"pk={pk}")
28 if sk is not None:
29 s3path = s3path.joinpath(f"sk={sk}")
30 fingerprint = get_md5(value)
31 s3path = s3path.joinpath(f"attr={attr}", f"hash={fingerprint}")
32 return s3path.key
33
34
35aws_region = "us-east-1"
36
37
38class Model(pm.Model):
39 class Meta:
40 table_name = "test"
41 region = aws_region
42 billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
43
44 pk = pm.UnicodeAttribute(hash_key=True)
45 update_at = pm.UTCDateTimeAttribute()
46 html = pm.UnicodeAttribute()
47 image = pm.UnicodeAttribute()
48
49
50mock_dynamodb = moto.mock_dynamodb()
51mock_s3 = moto.mock_s3()
52mock_dynamodb.start()
53mock_s3.start()
54
55bsm = BotoSesManager(region_name=aws_region)
56context.attach_boto_session(boto_ses=bsm.boto_ses)
57
58bucket = "my-bucket"
59bsm.s3_client.create_bucket(Bucket=bucket)
60
61Model.create_table(wait=True)
62
63# ------------------------------------------------------------------------------
64# Create item
65# ------------------------------------------------------------------------------
66print("--- Create Item ---")
67utc_now = get_utc_now()
68
69pk = "id-1"
70html = "<b>Hello Alice</b>"
71html_data = html.encode("utf-8")
72image = "this is image one".encode("utf-8")
73image_data = image
74
75html_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.html.attr_name, value=html_data)
76image_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.image.attr_name, value=image_data)
77
78print("Create S3 ...")
79s3path_html = S3Path(f"s3://{bucket}/{html_s3_key}")
80s3path_html.write_bytes(
81 html_data,
82 metadata={
83 "pk": pk,
84 "attr": Model.html.attr_name,
85 "update_at": utc_now.isoformat(),
86 },
87)
88
89s3path_image = S3Path(f"s3://{bucket}/{image_s3_key}")
90s3path_image.write_bytes(
91 image_data,
92 metadata={
93 "pk": pk,
94 "attr": Model.image.attr_name,
95 "update_at": utc_now.isoformat(),
96 },
97)
98
99print("Create DynamoDB ...")
100model = Model(pk=pk, update_at=utc_now, html=s3path_html.uri, image=s3path_image.uri)
101model.save()
102
103# ------------------------------------------------------------------------------
104# Get item
105# ------------------------------------------------------------------------------
106print("--- Get Item ---")
107model = Model.get(pk)
108html = S3Path(model.html).read_bytes().decode("utf-8")
109image = S3Path(model.image).read_bytes()
110print(f"{html = }")
111print(f"{image = }")
112
113# ------------------------------------------------------------------------------
114# Inspect S3 Bucket
115# ------------------------------------------------------------------------------
116print("--- Inspect S3 Bucket ---")
117for s3path in S3Path(bucket).iter_objects():
118 print(s3path.uri)
119
120# ------------------------------------------------------------------------------
121# Update item
122# ------------------------------------------------------------------------------
123print("--- Update Item ---")
124model = Model.get(pk)
125
126utc_now = get_utc_now()
127
128html = "<b>Hello Bob</b>"
129html_data = html.encode("utf-8")
130image = "this is image two".encode("utf-8")
131image_data = image
132
133# get existing s3 object location, we may need to delete them if update success
134s3path_existing_html = S3Path(model.html)
135s3path_existing_image = S3Path(model.image)
136
137html_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.html.attr_name, value=html_data)
138image_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.image.attr_name, value=image_data)
139s3path_html = S3Path(f"s3://{bucket}/{html_s3_key}")
140s3path_image = S3Path(f"s3://{bucket}/{image_s3_key}")
141
142print("Put S3 ...")
143s3path_html.write_bytes(
144 html_data,
145 metadata={
146 "pk": pk,
147 "attr": Model.html.attr_name,
148 "update_at": utc_now.isoformat(),
149 },
150)
151s3path_image.write_bytes(
152 image_data,
153 metadata={
154 "pk": pk,
155 "attr": Model.image.attr_name,
156 "update_at": utc_now.isoformat(),
157 },
158)
159
160try:
161 print("Update DynamoDB ...")
162 model.update(
163 actions=[
164 Model.html.set(s3path_html.uri),
165 Model.image.set(s3path_image.uri),
166 ]
167 )
168 # note that this may not need. because the new content could be the same
169 # as the old content, so that we should not remove old s3 object
170 # print("(optional) Remove old S3 object ...")
171 # s3path_existing_html.delete()
172 # s3path_existing_image.delete()
173except Exception as e:
174 # note that this may not need. because the new content could be the same
175 # as the old content, so that we should not remove old s3 object
176 # print("(optional) Remove newly created s3 object if DynamoDB update failed ...")
177 # s3path_html.delete()
178 # s3path_image.delete()
179 pass
180
181# ------------------------------------------------------------------------------
182# Get item
183# ------------------------------------------------------------------------------
184print("--- Get Item ---")
185model = Model.get(pk)
186html = S3Path(model.html).read_bytes().decode("utf-8")
187image = S3Path(model.image).read_bytes()
188print(f"{html = }")
189print(f"{image = }")
190
191# ------------------------------------------------------------------------------
192# Inspect S3 Bucket
193# ------------------------------------------------------------------------------
194print("--- Inspect S3 Bucket ---")
195for s3path in S3Path(bucket).iter_objects():
196 print(s3path.uri)
197
198
199# ------------------------------------------------------------------------------
200# Delete item
201#
202# Delete DynamoDB object first, then delete S3. In this case, if DynamoDB operation failed,
203# it's OK. If DynamoDB operation succeeded but S3 operation failed, you end up with
204# an ghost S3 object, which is OK, you can clean them up anytime.
205#
206# Don't delete S3 first. Because if S3 operation succeeded but DynamoDB operation failed,
207# reader can get DynamoDB item but cannot find the S3 object, which is very bad.
208#
209# ------------------------------------------------------------------------------
210print("--- Delete Item ---")
211model = Model.get(pk)
212s3path_existing_html = S3Path(model.html)
213s3path_existing_image = S3Path(model.image)
214
215print("delete DynamoDB ...")
216model.delete()
217
218print("delete S3 ...")
219s3path_existing_html.delete()
220s3path_existing_image.delete()
221
222# ------------------------------------------------------------------------------
223# Get item
224# ------------------------------------------------------------------------------
225print("--- Get Item ---")
226model = Model.get_one_or_none(pk)
227print(f"{model = }")
228
229# ------------------------------------------------------------------------------
230# Inspect S3 Bucket
231# ------------------------------------------------------------------------------
232print("--- Inspect S3 Bucket ---")
233for s3path in S3Path(bucket).iter_objects():
234 print(s3path.uri)
235
236
237# ------------------------------------------------------------------------------
238# Clean Up Unused S3 Object
239#
240# 注意: 由于 S3 update 的时间可能比真实的 DynamoDB update 时间要早一点
241# (取决于写入 S3 的耗时), 所以我们可以把时间回溯个 1 小时, 只对比在这之前的数据既可.
242# ------------------------------------------------------------------------------
243print("--- Clean Up Unused S3 Object ---")
244s3uri_set = set()
245for model in Model.scan(attributes_to_get=["html", "image"]):
246 s3uri_set.add(model.html)
247 s3uri_set.add(model.image)
248
249for s3path in S3Path(f"s3://{bucket}/").iter_objects():
250 if s3path.uri not in s3uri_set:
251 s3path.delete()