Storage Large Item in DynamoDB#

Keywords: aws, amazon, dynamodb

很多时候我们会有用 DynamoDB 来管理大型 binary 数据的需求. 例如文档管理系统, Artifacts 管理系统. 而 DynamoDB 有单个 Item 400KB 的限制. 这种情况下, 根据 AWS 官方文档中的 Best practices for storing large items and attributes 一文, 你应该将数据存在 S3 上而将 S3 URI 存在 DynamoDB 上. 这种方法听起来简单, 但是在生产实践中有很多细节问题值得商榷. 例如:

  1. 写入到 DynamoDB 和 S3 的操作如果有一个失败了怎么处理?

  2. 如果 DynamoDB 有多个属性都是 Large Binary, 这必然导致写入 S3 的耗时会比较长, 如何保证它们全部成功或者全部失败?

  3. 写入到 S3 的时候应该怎么构建 S3 URI?

  4. 在 Update 的时候是否要删除旧的 S3 Object? 长期下来产生的很多没有用的 S3 Object 怎么清理?

本文将详细的讨论这种将数据写入到 S3, 将 S3 URI 存到 DynamoDB 中的正确做法.

Conclusion#

  • 双写一致性: Create / Update 时先写 S3, 再写 DynamoDB, Delete 时先删 DynamoDB 再删 S3.

  • S3 Location: 使用 s3://bucket/prefix/${partition_key}/${optional_sort_key}/${attribute_name}/${hash_of_the_data} 的模式.

  • Clean Up: 每个 S3 object 都有一个 update at 的 metadata, 这和 DynamoDB item 的时间一致. 所以我们可以用 DynamoDB export 到 S3 (该操作比较便宜, 并不消耗 RCU, 它是用 bin log 实现的. 请看这篇 DynamoDB data export to Amazon S3: how it works 官方文档), 然后用一个 batch 程序去对比 DynamoDB 和 S3 既可. 由于 S3 update 的时间可能比真实的 DynamoDB update 时间要早一点 (取决于写入 S3 的耗时), 所以我们可以把时间回溯个 1 小时, 只对比在这之前的数据既可.

Diagram#

Store-Large-Item-in-DynamoDB

Sample Script#

store_large_item_in_dynamodb.py
  1# -*- coding: utf-8 -*-
  2
  3import hashlib
  4from datetime import datetime, timezone
  5
  6import moto
  7import pynamodb_mate as pm
  8from s3pathlib import S3Path, context
  9from boto_session_manager import BotoSesManager
 10
 11
 12def get_md5(b: bytes) -> str:
 13    return hashlib.md5(b).hexdigest()
 14
 15
 16def get_utc_now() -> datetime:
 17    return datetime.utcnow().replace(tzinfo=timezone.utc)
 18
 19
 20def get_s3_key(
 21    pk,
 22    sk,
 23    attr: str,
 24    value: bytes,
 25    prefix: str = "",
 26) -> str:
 27    s3path = S3Path(f"s3://my-bucket").joinpath(prefix).joinpath(f"pk={pk}")
 28    if sk is not None:
 29        s3path = s3path.joinpath(f"sk={sk}")
 30    fingerprint = get_md5(value)
 31    s3path = s3path.joinpath(f"attr={attr}", f"hash={fingerprint}")
 32    return s3path.key
 33
 34
 35aws_region = "us-east-1"
 36
 37
 38class Model(pm.Model):
 39    class Meta:
 40        table_name = "test"
 41        region = aws_region
 42        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
 43
 44    pk = pm.UnicodeAttribute(hash_key=True)
 45    update_at = pm.UTCDateTimeAttribute()
 46    html = pm.UnicodeAttribute()
 47    image = pm.UnicodeAttribute()
 48
 49
 50mock_dynamodb = moto.mock_dynamodb()
 51mock_s3 = moto.mock_s3()
 52mock_dynamodb.start()
 53mock_s3.start()
 54
 55bsm = BotoSesManager(region_name=aws_region)
 56context.attach_boto_session(boto_ses=bsm.boto_ses)
 57
 58bucket = "my-bucket"
 59bsm.s3_client.create_bucket(Bucket=bucket)
 60
 61Model.create_table(wait=True)
 62
 63# ------------------------------------------------------------------------------
 64# Create item
 65# ------------------------------------------------------------------------------
 66print("--- Create Item ---")
 67utc_now = get_utc_now()
 68
 69pk = "id-1"
 70html = "<b>Hello Alice</b>"
 71html_data = html.encode("utf-8")
 72image = "this is image one".encode("utf-8")
 73image_data = image
 74
 75html_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.html.attr_name, value=html_data)
 76image_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.image.attr_name, value=image_data)
 77
 78print("Create S3 ...")
 79s3path_html = S3Path(f"s3://{bucket}/{html_s3_key}")
 80s3path_html.write_bytes(
 81    html_data,
 82    metadata={
 83        "pk": pk,
 84        "attr": Model.html.attr_name,
 85        "update_at": utc_now.isoformat(),
 86    },
 87)
 88
 89s3path_image = S3Path(f"s3://{bucket}/{image_s3_key}")
 90s3path_image.write_bytes(
 91    image_data,
 92    metadata={
 93        "pk": pk,
 94        "attr": Model.image.attr_name,
 95        "update_at": utc_now.isoformat(),
 96    },
 97)
 98
 99print("Create DynamoDB ...")
100model = Model(pk=pk, update_at=utc_now, html=s3path_html.uri, image=s3path_image.uri)
101model.save()
102
103# ------------------------------------------------------------------------------
104# Get item
105# ------------------------------------------------------------------------------
106print("--- Get Item ---")
107model = Model.get(pk)
108html = S3Path(model.html).read_bytes().decode("utf-8")
109image = S3Path(model.image).read_bytes()
110print(f"{html = }")
111print(f"{image = }")
112
113# ------------------------------------------------------------------------------
114# Inspect S3 Bucket
115# ------------------------------------------------------------------------------
116print("--- Inspect S3 Bucket ---")
117for s3path in S3Path(bucket).iter_objects():
118    print(s3path.uri)
119
120# ------------------------------------------------------------------------------
121# Update item
122# ------------------------------------------------------------------------------
123print("--- Update Item ---")
124model = Model.get(pk)
125
126utc_now = get_utc_now()
127
128html = "<b>Hello Bob</b>"
129html_data = html.encode("utf-8")
130image = "this is image two".encode("utf-8")
131image_data = image
132
133# get existing s3 object location, we may need to delete them if update success
134s3path_existing_html = S3Path(model.html)
135s3path_existing_image = S3Path(model.image)
136
137html_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.html.attr_name, value=html_data)
138image_s3_key = get_s3_key(pk=pk, sk=None, attr=Model.image.attr_name, value=image_data)
139s3path_html = S3Path(f"s3://{bucket}/{html_s3_key}")
140s3path_image = S3Path(f"s3://{bucket}/{image_s3_key}")
141
142print("Put S3 ...")
143s3path_html.write_bytes(
144    html_data,
145    metadata={
146        "pk": pk,
147        "attr": Model.html.attr_name,
148        "update_at": utc_now.isoformat(),
149    },
150)
151s3path_image.write_bytes(
152    image_data,
153    metadata={
154        "pk": pk,
155        "attr": Model.image.attr_name,
156        "update_at": utc_now.isoformat(),
157    },
158)
159
160try:
161    print("Update DynamoDB ...")
162    model.update(
163        actions=[
164            Model.html.set(s3path_html.uri),
165            Model.image.set(s3path_image.uri),
166        ]
167    )
168    # note that this may not need. because the new content could be the same
169    # as the old content, so that we should not remove old s3 object
170    # print("(optional) Remove old S3 object ...")
171    # s3path_existing_html.delete()
172    # s3path_existing_image.delete()
173except Exception as e:
174    # note that this may not need. because the new content could be the same
175    # as the old content, so that we should not remove old s3 object
176    # print("(optional) Remove newly created s3 object if DynamoDB update failed ...")
177    # s3path_html.delete()
178    # s3path_image.delete()
179    pass
180
181# ------------------------------------------------------------------------------
182# Get item
183# ------------------------------------------------------------------------------
184print("--- Get Item ---")
185model = Model.get(pk)
186html = S3Path(model.html).read_bytes().decode("utf-8")
187image = S3Path(model.image).read_bytes()
188print(f"{html = }")
189print(f"{image = }")
190
191# ------------------------------------------------------------------------------
192# Inspect S3 Bucket
193# ------------------------------------------------------------------------------
194print("--- Inspect S3 Bucket ---")
195for s3path in S3Path(bucket).iter_objects():
196    print(s3path.uri)
197
198
199# ------------------------------------------------------------------------------
200# Delete item
201#
202# Delete DynamoDB object first, then delete S3. In this case, if DynamoDB operation failed,
203# it's OK. If DynamoDB operation succeeded but S3 operation failed, you end up with
204# an ghost S3 object, which is OK, you can clean them up anytime.
205#
206# Don't delete S3 first. Because if S3 operation succeeded but DynamoDB operation failed,
207# reader can get DynamoDB item but cannot find the S3 object, which is very bad.
208#
209# ------------------------------------------------------------------------------
210print("--- Delete Item ---")
211model = Model.get(pk)
212s3path_existing_html = S3Path(model.html)
213s3path_existing_image = S3Path(model.image)
214
215print("delete DynamoDB ...")
216model.delete()
217
218print("delete S3 ...")
219s3path_existing_html.delete()
220s3path_existing_image.delete()
221
222# ------------------------------------------------------------------------------
223# Get item
224# ------------------------------------------------------------------------------
225print("--- Get Item ---")
226model = Model.get_one_or_none(pk)
227print(f"{model = }")
228
229# ------------------------------------------------------------------------------
230# Inspect S3 Bucket
231# ------------------------------------------------------------------------------
232print("--- Inspect S3 Bucket ---")
233for s3path in S3Path(bucket).iter_objects():
234    print(s3path.uri)
235
236
237# ------------------------------------------------------------------------------
238# Clean Up Unused S3 Object
239#
240# 注意: 由于 S3 update 的时间可能比真实的 DynamoDB update 时间要早一点
241# (取决于写入 S3 的耗时), 所以我们可以把时间回溯个 1 小时, 只对比在这之前的数据既可.
242# ------------------------------------------------------------------------------
243print("--- Clean Up Unused S3 Object ---")
244s3uri_set = set()
245for model in Model.scan(attributes_to_get=["html", "image"]):
246    s3uri_set.add(model.html)
247    s3uri_set.add(model.image)
248
249for s3path in S3Path(f"s3://{bucket}/").iter_objects():
250    if s3path.uri not in s3uri_set:
251        s3path.delete()