Dynamodb Solution - Authentication and Authorization#
Keywords: aws, amazon, dynamodb
1# -*- coding: utf-8 -*-
2
3"""
4该模块是一个非常简单的用于生成 API Key 并验证 API Key 是否过期的模块.
5"""
6
7import typing as T
8import secrets
9import pynamodb_mate as pm
10from datetime import datetime, timedelta, timezone
11
12
13T_STR = T.Union[str, pm.UnicodeAttribute]
14T_DT = T.Union[datetime, pm.UTCDateTimeAttribute]
15
16
17def get_utc_now() -> datetime:
18 return datetime.utcnow().replace(tzinfo=timezone.utc)
19
20
21class Auth(pm.Model):
22 """
23 后面的类的基类. 在 DynamoDB 中, 不同的类将会放在同一个 Table 中.
24 不同类的 partition key 会有不同的前缀, 以便于查询.
25 """
26
27 pk: T_STR = pm.UnicodeAttribute(hash_key=True)
28
29
30def token_to_pk(token: str) -> str:
31 return f"token-{token}"
32
33
34def pk_to_token(pk: str) -> str:
35 return pk.split("-")[1]
36
37
38class ApiToken(Auth):
39 """
40 代表一个用户创建的 API Token. 一个用户可以创建多个 API Token.
41 """
42
43 class Meta:
44 table_name = "auth"
45 region = "us-east-1"
46 billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
47
48 account_id: T_STR = pm.UnicodeAttribute()
49 expire_at: T_DT = pm.UTCDateTimeAttribute()
50
51 @property
52 def token(self) -> str:
53 return pk_to_token(self.pk)
54
55 def is_expired(self, utc_now: datetime) -> bool:
56 return utc_now > self.expire_at
57
58 @classmethod
59 def new(
60 cls,
61 account_id: str,
62 expire: int,
63 ):
64 api_token = cls(
65 pk=token_to_pk(secrets.token_hex(16)),
66 account_id=account_id,
67 expire_at=get_utc_now() + timedelta(seconds=expire),
68 )
69 api_token.save()
70 return api_token
71
72 @classmethod
73 def get_if_is_allowed(cls, token: str) -> T.Optional["ApiToken"]:
74 api_token = cls.get_one_or_none(token_to_pk(token))
75 if api_token is None:
76 return None
77 if api_token.is_expired(get_utc_now()):
78 return None
79 else:
80 return api_token
81
82
83if __name__ == "__main__":
84 import time
85 import moto
86
87 mock_dynamodb = moto.mock_dynamodb()
88 mock_dynamodb.start()
89 pm.Connection()
90
91 ApiToken.create_table()
92
93 account_id = "alice@email.com"
94 api_token = ApiToken.new(account_id, 1)
95 print(api_token.token)
96
97 api_token_1 = ApiToken.get_if_is_allowed(api_token.token)
98 print(api_token_1 is not None)
99
100 time.sleep(2)
101 api_token_1 = ApiToken.get_if_is_allowed(api_token.token)
102 print(api_token_1 is not None)