Dynamodb Solution - Authentication and Authorization#

Keywords: aws, amazon, dynamodb

  1# -*- coding: utf-8 -*-
  2
  3"""
  4该模块是一个非常简单的用于生成 API Key 并验证 API Key 是否过期的模块.
  5"""
  6
  7import typing as T
  8import secrets
  9import pynamodb_mate as pm
 10from datetime import datetime, timedelta, timezone
 11
 12
 13T_STR = T.Union[str, pm.UnicodeAttribute]
 14T_DT = T.Union[datetime, pm.UTCDateTimeAttribute]
 15
 16
 17def get_utc_now() -> datetime:
 18    return datetime.utcnow().replace(tzinfo=timezone.utc)
 19
 20
 21class Auth(pm.Model):
 22    """
 23    后面的类的基类. 在 DynamoDB 中, 不同的类将会放在同一个 Table 中.
 24    不同类的 partition key 会有不同的前缀, 以便于查询.
 25    """
 26
 27    pk: T_STR = pm.UnicodeAttribute(hash_key=True)
 28
 29
 30def token_to_pk(token: str) -> str:
 31    return f"token-{token}"
 32
 33
 34def pk_to_token(pk: str) -> str:
 35    return pk.split("-")[1]
 36
 37
 38class ApiToken(Auth):
 39    """
 40    代表一个用户创建的 API Token. 一个用户可以创建多个 API Token.
 41    """
 42
 43    class Meta:
 44        table_name = "auth"
 45        region = "us-east-1"
 46        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE
 47
 48    account_id: T_STR = pm.UnicodeAttribute()
 49    expire_at: T_DT = pm.UTCDateTimeAttribute()
 50
 51    @property
 52    def token(self) -> str:
 53        return pk_to_token(self.pk)
 54
 55    def is_expired(self, utc_now: datetime) -> bool:
 56        return utc_now > self.expire_at
 57
 58    @classmethod
 59    def new(
 60        cls,
 61        account_id: str,
 62        expire: int,
 63    ):
 64        api_token = cls(
 65            pk=token_to_pk(secrets.token_hex(16)),
 66            account_id=account_id,
 67            expire_at=get_utc_now() + timedelta(seconds=expire),
 68        )
 69        api_token.save()
 70        return api_token
 71
 72    @classmethod
 73    def get_if_is_allowed(cls, token: str) -> T.Optional["ApiToken"]:
 74        api_token = cls.get_one_or_none(token_to_pk(token))
 75        if api_token is None:
 76            return None
 77        if api_token.is_expired(get_utc_now()):
 78            return None
 79        else:
 80            return api_token
 81
 82
 83if __name__ == "__main__":
 84    import time
 85    import moto
 86
 87    mock_dynamodb = moto.mock_dynamodb()
 88    mock_dynamodb.start()
 89    pm.Connection()
 90
 91    ApiToken.create_table()
 92
 93    account_id = "alice@email.com"
 94    api_token = ApiToken.new(account_id, 1)
 95    print(api_token.token)
 96
 97    api_token_1 = ApiToken.get_if_is_allowed(api_token.token)
 98    print(api_token_1 is not None)
 99
100    time.sleep(2)
101    api_token_1 = ApiToken.get_if_is_allowed(api_token.token)
102    print(api_token_1 is not None)