Dynamodb Solution - Exponential Backoff#
Keywords: aws, amazon, dynamodb
Exponential Backoff 是一个反馈系统中的算法. 说的是一个系统有 Event 可以触发 Response 的, 如果连续进来许多个相同 Event, 我们不希望为所有的 Event 都返回 Response, 这样会占用大量系统资源. 而是使用 指数轮训 的方式, 让每两次返回 Response 的间隔越来越长. 举例来说, 前 1 秒内最多返回一个 Response, 1 - 2 秒内最多返回 1 个, 2 - 4 秒内最多返回 1 个, 4 - 8 秒内最多返回 1 个, 依次按 2 的倍数递增, 然后超过 3600 秒则刷新整个过程. 这里只是一个例子, 其中的间隔, 倍数, 刷新时间都可以自定义.
在 AWS 的很多服务中都有这种机制, 常常用于异常处理, 错误重试. 例如如果一个 Lambda Function Async Call fail 了, 你重试的次数如果是 3. 那么第一次可能要等 1 秒, 第二次要等 10 秒, 第三次可能要等 30 秒.
这种机制在软件工程里非常常用, 但是你自己实现这种机制却不是那么的容易. 所以这里我给出了一种 Python 实现, 可以在多个项目中复用.
1# -*- coding: utf-8 -*-
2
3"""
4This script implements the monitor lambda function with exponential backoff feature.
5
6You can let the AWS Lambda Function to send a notification to an SNS topic
7automatically when there is a failure. However, if a high concurrent Lambda Function
8send tons of same failure notification to your SNS topic, it may flood your
9alert system.
10
11To avoid this, we usually use exponential backoff to ensure that we only send
12small amount of notification in a short period of time.
13"""
14
15import typing as T
16from datetime import datetime, timezone
17
18import pynamodb_mate as pm
19
20
21def send_notification():
22 print("Just send a notification")
23
24
25# reset the exponential backoff if elapsed time from the first notification
26# is greater than this value
27reset_time = 7 * 24 * 60 * 60 # 7 days
28
29# the exponential backoff bracket
30backoff_wait_time = [
31 0, # 0 seconds, send first notification immediately
32 1 * 60, # 1 min, wait 1 min before send the second notification
33 5 * 60, # 5 min, wait 5 min before send the third notification
34 15 * 60, # 10 min
35 30 * 60, # 30 min
36 1 * 3600, # 1 hour
37 4 * 3600, # 4 hour
38 12 * 3600, # 12 hour
39 1 * 86400, # 1 day
40 2 * 86400, # 2 day
41 4 * 86400, # 4 day
42]
43
44# backoff_wait_time = [0, 1, 5, 10, 30, 60, 300]
45
46
47def get_utc_now() -> datetime:
48 return datetime.utcnow().replace(tzinfo=timezone.utc)
49
50
51class Tracker(pm.Model):
52 """
53 The DynamoDB table serves as the centralized tracker for distributive workers.
54 It can track the first report time and last report time to identify that
55 should we send a notification or not.
56 """
57 class Meta:
58 table_name = "notification-exponential-backoff"
59 region = "us-east-1"
60 billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
61
62 # fmt: off
63 pk: T.Union[str, pm.UnicodeAttribute] = pm.UnicodeAttribute(hash_key=True)
64 sk: T.Union[str, pm.UnicodeAttribute] = pm.UnicodeAttribute(range_key=True)
65 count: T.Union[int, pm.NumberAttribute] = pm.NumberAttribute()
66 first_report_time: T.Union[datetime, pm.UTCDateTimeAttribute] = pm.UTCDateTimeAttribute()
67 last_report_time: T.Union[datetime, pm.UTCDateTimeAttribute] = pm.UTCDateTimeAttribute()
68 # fmt: on
69
70 @property
71 def principal_id(self) -> str:
72 """
73 The principal who is failed to process the task. Usually the AWS ARN.
74 """
75 return self.pk
76
77 @property
78 def cause_id(self) -> str:
79 """
80 The cause of the failure. Usually the Exception type name.
81 """
82 return self.sk
83
84 @classmethod
85 def send_notification(
86 cls,
87 principal_id: str,
88 cause_id: str,
89 ):
90 now = get_utc_now()
91 try:
92 tracker = Tracker.get(principal_id, cause_id)
93 except Tracker.DoesNotExist:
94 tracker = Tracker(
95 pk=principal_id,
96 sk=cause_id,
97 count=1,
98 first_report_time=now,
99 last_report_time=now,
100 )
101 send_notification()
102 tracker.save()
103 return tracker
104
105 wait_time = backoff_wait_time[tracker.count]
106 if (now - tracker.first_report_time).total_seconds() >= reset_time:
107 send_notification()
108 tracker.update(
109 actions=[
110 Tracker.count.set(1),
111 Tracker.last_report_time.set(now),
112 Tracker.last_report_time.set(now),
113 ]
114 )
115 return tracker
116 elif (now - tracker.last_report_time).total_seconds() >= wait_time:
117 send_notification()
118 tracker.update(
119 actions=[
120 Tracker.count.set(tracker.count + 1),
121 Tracker.last_report_time.set(now),
122 ]
123 )
124 return tracker
125 else:
126 return None
127
128
129if __name__ == "__main__":
130 import time
131 import moto
132
133 mock_dynamodb = moto.mock_dynamodb()
134 mock_dynamodb.start()
135
136 pm.Connection()
137 Tracker.create_table(wait=True)
138 principal_id = "my-lambda-func"
139 cause_id = "ValueError"
140
141 start_time = get_utc_now()
142 for _ in range(60):
143 time.sleep(1)
144 elapsed = int((get_utc_now() - start_time).total_seconds())
145 print(f"elapsed: {elapsed}")
146 tracker = Tracker.send_notification(principal_id, cause_id)
147 if tracker is not None:
148 print(f" tracker: {tracker.attribute_values}")
149
150 mock_dynamodb.stop()
151
152 # example output:
153 # elapsed: 1
154 # Just send a notification
155 # tracker: {'pk': 'my-lambda-func', 'sk': 'ValueError', 'count': 1, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc)}
156 # elapsed: 2
157 # Just send a notification
158 # tracker: {'count': 2, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 7, 736981, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
159 # elapsed: 3
160 # elapsed: 4
161 # elapsed: 5
162 # elapsed: 6
163 # elapsed: 7
164 # Just send a notification
165 # tracker: {'count': 3, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 12, 766106, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
166 # elapsed: 8
167 # elapsed: 9
168 # elapsed: 10
169 # elapsed: 11
170 # elapsed: 12
171 # elapsed: 13
172 # elapsed: 14
173 # elapsed: 15
174 # elapsed: 16
175 # elapsed: 17
176 # Just send a notification
177 # tracker: {'count': 4, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 22, 814980, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
178 # elapsed: 18
179 # elapsed: 19
180 # elapsed: 20
181 # elapsed: 21
182 # elapsed: 22
183 # elapsed: 23
184 # elapsed: 24
185 # elapsed: 25
186 # elapsed: 26
187 # elapsed: 27
188 # elapsed: 28
189 # elapsed: 29
190 # elapsed: 30
191 # elapsed: 31
192 # elapsed: 32
193 # elapsed: 33
194 # elapsed: 34
195 # elapsed: 35
196 # elapsed: 36
197 # elapsed: 37
198 # elapsed: 38
199 # elapsed: 39
200 # elapsed: 40
201 # elapsed: 41
202 # elapsed: 42
203 # elapsed: 43
204 # elapsed: 44
205 # elapsed: 45
206 # elapsed: 46
207 # elapsed: 47
208 # Just send a notification
209 # tracker: {'count': 5, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 53, 39960, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
210 # elapsed: 48
211 # elapsed: 49
212 # elapsed: 50
213 # elapsed: 51
214 # elapsed: 52
215 # elapsed: 53
216 # elapsed: 54
217 # elapsed: 55
218 # elapsed: 56
219 # elapsed: 57
220 # elapsed: 58
221 # elapsed: 59
222 # elapsed: 60