Dynamodb Solution - Exponential Backoff#

Keywords: aws, amazon, dynamodb

Exponential Backoff 是一个反馈系统中的算法. 说的是一个系统有 Event 可以触发 Response 的, 如果连续进来许多个相同 Event, 我们不希望为所有的 Event 都返回 Response, 这样会占用大量系统资源. 而是使用 指数轮训 的方式, 让每两次返回 Response 的间隔越来越长. 举例来说, 前 1 秒内最多返回一个 Response, 1 - 2 秒内最多返回 1 个, 2 - 4 秒内最多返回 1 个, 4 - 8 秒内最多返回 1 个, 依次按 2 的倍数递增, 然后超过 3600 秒则刷新整个过程. 这里只是一个例子, 其中的间隔, 倍数, 刷新时间都可以自定义.

在 AWS 的很多服务中都有这种机制, 常常用于异常处理, 错误重试. 例如如果一个 Lambda Function Async Call fail 了, 你重试的次数如果是 3. 那么第一次可能要等 1 秒, 第二次要等 10 秒, 第三次可能要等 30 秒.

这种机制在软件工程里非常常用, 但是你自己实现这种机制却不是那么的容易. 所以这里我给出了一种 Python 实现, 可以在多个项目中复用.

  1# -*- coding: utf-8 -*-
  2
  3"""
  4This script implements the monitor lambda function with exponential backoff feature.
  5
  6You can let the AWS Lambda Function to send a notification to an SNS topic
  7automatically when there is a failure. However, if a high concurrent Lambda Function
  8send tons of same failure notification to your SNS topic, it may flood your
  9alert system.
 10
 11To avoid this, we usually use exponential backoff to ensure that we only send
 12small amount of notification in a short period of time.
 13"""
 14
 15import typing as T
 16from datetime import datetime, timezone
 17
 18import pynamodb_mate as pm
 19
 20
 21def send_notification():
 22    print("Just send a notification")
 23
 24
 25# reset the exponential backoff if elapsed time from the first notification
 26# is greater than this value
 27reset_time = 7 * 24 * 60 * 60  # 7 days
 28
 29# the exponential backoff bracket
 30backoff_wait_time = [
 31    0,  # 0 seconds, send first notification immediately
 32    1 * 60,  # 1 min, wait 1 min before send the second notification
 33    5 * 60,  # 5 min, wait 5 min before send the third notification
 34    15 * 60,  # 10 min
 35    30 * 60,  # 30 min
 36    1 * 3600,  # 1 hour
 37    4 * 3600,  # 4 hour
 38    12 * 3600,  # 12 hour
 39    1 * 86400,  # 1 day
 40    2 * 86400,  # 2 day
 41    4 * 86400,  # 4 day
 42]
 43
 44# backoff_wait_time = [0, 1, 5, 10, 30, 60, 300]
 45
 46
 47def get_utc_now() -> datetime:
 48    return datetime.utcnow().replace(tzinfo=timezone.utc)
 49
 50
 51class Tracker(pm.Model):
 52    """
 53    The DynamoDB table serves as the centralized tracker for distributive workers.
 54    It can track the first report time and last report time to identify that
 55    should we send a notification or not.
 56    """
 57    class Meta:
 58        table_name = "notification-exponential-backoff"
 59        region = "us-east-1"
 60        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
 61
 62    # fmt: off
 63    pk: T.Union[str, pm.UnicodeAttribute] = pm.UnicodeAttribute(hash_key=True)
 64    sk: T.Union[str, pm.UnicodeAttribute] = pm.UnicodeAttribute(range_key=True)
 65    count: T.Union[int, pm.NumberAttribute] = pm.NumberAttribute()
 66    first_report_time: T.Union[datetime, pm.UTCDateTimeAttribute] = pm.UTCDateTimeAttribute()
 67    last_report_time: T.Union[datetime, pm.UTCDateTimeAttribute] = pm.UTCDateTimeAttribute()
 68    # fmt: on
 69
 70    @property
 71    def principal_id(self) -> str:
 72        """
 73        The principal who is failed to process the task. Usually the AWS ARN.
 74        """
 75        return self.pk
 76
 77    @property
 78    def cause_id(self) -> str:
 79        """
 80        The cause of the failure. Usually the Exception type name.
 81        """
 82        return self.sk
 83
 84    @classmethod
 85    def send_notification(
 86        cls,
 87        principal_id: str,
 88        cause_id: str,
 89    ):
 90        now = get_utc_now()
 91        try:
 92            tracker = Tracker.get(principal_id, cause_id)
 93        except Tracker.DoesNotExist:
 94            tracker = Tracker(
 95                pk=principal_id,
 96                sk=cause_id,
 97                count=1,
 98                first_report_time=now,
 99                last_report_time=now,
100            )
101            send_notification()
102            tracker.save()
103            return tracker
104
105        wait_time = backoff_wait_time[tracker.count]
106        if (now - tracker.first_report_time).total_seconds() >= reset_time:
107            send_notification()
108            tracker.update(
109                actions=[
110                    Tracker.count.set(1),
111                    Tracker.last_report_time.set(now),
112                    Tracker.last_report_time.set(now),
113                ]
114            )
115            return tracker
116        elif (now - tracker.last_report_time).total_seconds() >= wait_time:
117            send_notification()
118            tracker.update(
119                actions=[
120                    Tracker.count.set(tracker.count + 1),
121                    Tracker.last_report_time.set(now),
122                ]
123            )
124            return tracker
125        else:
126            return None
127
128
129if __name__ == "__main__":
130    import time
131    import moto
132
133    mock_dynamodb = moto.mock_dynamodb()
134    mock_dynamodb.start()
135
136    pm.Connection()
137    Tracker.create_table(wait=True)
138    principal_id = "my-lambda-func"
139    cause_id = "ValueError"
140
141    start_time = get_utc_now()
142    for _ in range(60):
143        time.sleep(1)
144        elapsed = int((get_utc_now() - start_time).total_seconds())
145        print(f"elapsed: {elapsed}")
146        tracker = Tracker.send_notification(principal_id, cause_id)
147        if tracker is not None:
148            print(f"  tracker: {tracker.attribute_values}")
149
150    mock_dynamodb.stop()
151
152    # example output:
153    # elapsed: 1
154    # Just send a notification
155    #   tracker: {'pk': 'my-lambda-func', 'sk': 'ValueError', 'count': 1, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc)}
156    # elapsed: 2
157    # Just send a notification
158    #   tracker: {'count': 2, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 7, 736981, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
159    # elapsed: 3
160    # elapsed: 4
161    # elapsed: 5
162    # elapsed: 6
163    # elapsed: 7
164    # Just send a notification
165    #   tracker: {'count': 3, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 12, 766106, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
166    # elapsed: 8
167    # elapsed: 9
168    # elapsed: 10
169    # elapsed: 11
170    # elapsed: 12
171    # elapsed: 13
172    # elapsed: 14
173    # elapsed: 15
174    # elapsed: 16
175    # elapsed: 17
176    # Just send a notification
177    #   tracker: {'count': 4, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 22, 814980, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
178    # elapsed: 18
179    # elapsed: 19
180    # elapsed: 20
181    # elapsed: 21
182    # elapsed: 22
183    # elapsed: 23
184    # elapsed: 24
185    # elapsed: 25
186    # elapsed: 26
187    # elapsed: 27
188    # elapsed: 28
189    # elapsed: 29
190    # elapsed: 30
191    # elapsed: 31
192    # elapsed: 32
193    # elapsed: 33
194    # elapsed: 34
195    # elapsed: 35
196    # elapsed: 36
197    # elapsed: 37
198    # elapsed: 38
199    # elapsed: 39
200    # elapsed: 40
201    # elapsed: 41
202    # elapsed: 42
203    # elapsed: 43
204    # elapsed: 44
205    # elapsed: 45
206    # elapsed: 46
207    # elapsed: 47
208    # Just send a notification
209    #   tracker: {'count': 5, 'first_report_time': datetime.datetime(2023, 8, 16, 14, 25, 6, 730313, tzinfo=datetime.timezone.utc), 'last_report_time': datetime.datetime(2023, 8, 16, 14, 25, 53, 39960, tzinfo=datetime.timezone.utc), 'pk': 'my-lambda-func', 'sk': 'ValueError'}
210    # elapsed: 48
211    # elapsed: 49
212    # elapsed: 50
213    # elapsed: 51
214    # elapsed: 52
215    # elapsed: 53
216    # elapsed: 54
217    # elapsed: 55
218    # elapsed: 56
219    # elapsed: 57
220    # elapsed: 58
221    # elapsed: 59
222    # elapsed: 60