Connect To Redshift in Python#

Keywords: AWS, Amazon, Redshift, Connect

How it Works#

首先我们来了解一下用 Python 跟 Redshift 通信的原理.

首先我们回顾一下用 GUI 工具, 连接到数据库的原理. 我们以 DBeaver (免费的数据库 GUI) 为例. 一般对于不同的数据库你需要一个 Driver, Driver 定义了跟特定数据库通信的接口. 而 Redshift 本质上是一个数据库, 它当然也需要一个 Driver. 如果你用 DBeaver 连接 Redshift, 你会看到提示让你下载 Driver.

我们再来看用 Python 连接到数据库的原理. Python 中有一个标准, PEP 249 – Python Database API Specification v2.0, 如果你要用 Python 实现一个数据库 Driver, 你就必须符合这个标准. 这使得不同的数据库的 Driver 的 API 接口都是类似的. 而 Redshift 是一个数据库, 当然 Redshift Python Driver 也得符合这个标准.

AWS 官方维护着一个叫 redshift_connector 的 Python 库. 这个库就是官方的 Redshift Driver. 你看到的任何第三方库, 例如 awswrangler, sqlalchemy-redshift, 本质上都是对 redshift-connector 所创建的 connection 的封装.

Use Username Password or IAM#

AWS 提供了两种鉴权方式:

  1. 在数据库中创建 Database User (以及 password), 然后用 host, port, database, user, pass 的方式创建 DBApi2.0 connection.

  2. 使用 IAM Role, 本质上是用你的 IAM Principal 创建临时的 User / Pass (过一段时间后会失效), 然后创建 DBApi2.0 connection. 有这么几个 API 可以获得临时的 credential:

在 Redshift 产品早期, 你只能用 #1. 这就涉及到你需要用 Secret Manager 来保存密码并定期用 Lambda Rotate. 这比较的麻烦. 目前 (2024 年) 基本上你可以无脑使用 #2 的方式. 临时的 Credential 永远会更安全. 不过要注意的是 #2 用的是临时 Credential, 最大持续时间是 1 小时. 对于长时间运行的 App 你可能需要 Refresh Credential. 不过这个很好实现.

Redshift Data API#

Data API 是云原生产品的一大优势. AWS 允许你用 Rest API 异步执行 Query, 而无需创建连接. 这种做法超级超级方便. 非常适合用来写长期运行的 App 程序, 或是将其封装为 App 给最终用户使用.

Trouble Shoot#

  1. 注意你的 Cluster 或 WorkGroup 的 Security Group 白名单里有你的 IP 地址.

  2. 如果你的网络不是在 VPC 中的, 注意你的 Cluster 是否开启了 Public Access.

Sample Code#

这里我把用多种方法连接到 Redshift 并进行简单的 CRUD 操作的代码封装成了一个简单的库, 方便以后调用.

test_create_connect_for_serverless_using_iam.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4This example shows how to connect to Redshift Serverless using redshift connector.
 5"""
 6
 7import boto3
 8from pathlib import Path
 9
10import aws_redshift_helper.api as rs
11
12# load your config
13dir_here = Path(__file__).absolute().parent
14path_config_serverless = dir_here / "config-serverless.json"
15config_serverless = rs.Config.load(path_config_serverless)
16
17# create boto session
18boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
19
20# create redshift connection
21conn = rs.create_connect_for_serverless_using_iam(
22    boto_ses=boto_ses,
23    workgroup_name=config_serverless.workgroup,
24)
25
26rs.test_connection(conn)
test_create_sqlalchemy_engine_for_serverless_using_iam.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4This example shows how to connect to Redshift Serverless using Sqlalchemy.
  5"""
  6
  7import boto3
  8import uuid
  9import random
 10import textwrap
 11from pathlib import Path
 12from datetime import datetime
 13
 14import sqlalchemy as sa
 15import sqlalchemy.orm as orm
 16
 17import aws_redshift_helper.api as rs
 18
 19
 20Base = orm.declarative_base()
 21
 22
 23class Transaction(Base):
 24    __tablename__ = "transactions"
 25
 26    id: str = sa.Column(sa.String, primary_key=True)
 27    create_at: str = sa.Column(sa.String)
 28    update_at: str = sa.Column(sa.String)
 29    note: str = sa.Column(sa.String, nullable=True)
 30
 31    @classmethod
 32    def new(cls, note: str = None):
 33        return cls(
 34            id=str(uuid.uuid4()),
 35            create_at=datetime.utcnow().isoformat(),
 36            update_at=datetime.utcnow().isoformat(),
 37            note=note,
 38        )
 39
 40
 41def create_table(engine: "sa.engine.Engine"):
 42    with engine.connect() as conn:
 43        sql = textwrap.dedent(
 44            f"""
 45            DROP TABLE IF EXISTS {Transaction.__tablename__};
 46            """
 47        )
 48        conn.execute(sql)
 49
 50    with engine.connect() as conn:
 51        sql = textwrap.dedent(
 52            f"""
 53            CREATE TABLE {Transaction.__tablename__}(
 54                id VARCHAR(36) DISTKEY NOT NULL,
 55                create_at VARCHAR(26) NOT NULL,
 56                update_at VARCHAR(26) NOT NULL,
 57                note VARCHAR
 58            )
 59            DISTSTYLE key
 60            COMPOUND SORTKEY(create_at);
 61            """
 62        )
 63        conn.execute(sql)
 64
 65
 66def insert_data(engine: "sa.engine.Engine"):
 67    print(f"Insert some data into {Transaction.__tablename__!r} table")
 68    with orm.Session(engine) as ses:
 69        transaction = Transaction.new(note=f"note {random.randint(1, 1000000)}")
 70        ses.add(transaction)
 71        ses.commit()
 72
 73
 74def select_data(engine: "sa.engine.Engine"):
 75    print(f"select data from {Transaction.__tablename__!r} table")
 76
 77    # return object
 78    print("--- Return object ---")
 79    with orm.Session(engine) as ses:
 80        for transaction in ses.query(Transaction).limit(3):
 81            print(
 82                [
 83                    transaction.id,
 84                    transaction.create_at,
 85                    transaction.update_at,
 86                    transaction.note,
 87                ]
 88            )
 89
 90    # return python dict
 91    print("--- Return dict ---")
 92    with engine.connect() as conn:
 93        for transaction in conn.execute(sa.select(Transaction).limit(3)).mappings():
 94            print(transaction)
 95
 96
 97# load your config
 98dir_here = Path(__file__).absolute().parent
 99path_config_serverless = dir_here / "config-serverless.json"
100config_serverless = rs.Config.load(path_config_serverless)
101
102# create boto session
103boto_ses = boto3.session.Session()
104
105# create sqlalchemy engine
106engine = rs.create_sqlalchemy_engine_for_serverless_using_iam(
107    boto_ses=boto_ses,
108    workgroup_name=config_serverless.workgroup,
109)
110
111rs.test_engine(engine)
112create_table(engine)
113insert_data(engine)
114select_data(engine)
test_create_connect_for_cluster_using_iam.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4This example shows how to connect to Redshift Serverless using redshift connector.
 5"""
 6
 7import boto3
 8from pathlib import Path
 9
10import aws_redshift_helper.api as rs
11
12# load your config
13dir_here = Path(__file__).absolute().parent
14path_config_cluster = dir_here / "config-cluster.json"
15config_cluster = rs.Config.load(path_config_cluster)
16
17# create boto session
18boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
19
20# create redshift connection
21conn = rs.create_connect_for_cluster_using_iam(
22    boto_ses=boto_ses,
23    cluster_id=config_cluster.cluster_id,
24)
25
26rs.test_connection(conn)
test_create_sqlalchemy_engine_for_cluster_using_iam.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4This example shows how to connect to Redshift cluster using Sqlalchemy.
  5"""
  6
  7import boto3
  8import uuid
  9import random
 10import textwrap
 11from pathlib import Path
 12from datetime import datetime
 13
 14import sqlalchemy as sa
 15import sqlalchemy.orm as orm
 16
 17import aws_redshift_helper.api as rs
 18
 19
 20Base = orm.declarative_base()
 21
 22
 23class Transaction(Base):
 24    __tablename__ = "transactions"
 25
 26    id: str = sa.Column(sa.String, primary_key=True)
 27    create_at: str = sa.Column(sa.String)
 28    update_at: str = sa.Column(sa.String)
 29    note: str = sa.Column(sa.String, nullable=True)
 30
 31    @classmethod
 32    def new(cls, note: str = None):
 33        return cls(
 34            id=str(uuid.uuid4()),
 35            create_at=datetime.utcnow().isoformat(),
 36            update_at=datetime.utcnow().isoformat(),
 37            note=note,
 38        )
 39
 40
 41def create_table(engine: "sa.engine.Engine"):
 42    with engine.connect() as conn:
 43        sql = textwrap.dedent(
 44            f"""
 45            DROP TABLE IF EXISTS {Transaction.__tablename__};
 46            """
 47        )
 48        conn.execute(sql)
 49
 50    with engine.connect() as conn:
 51        sql = textwrap.dedent(
 52            f"""
 53            CREATE TABLE {Transaction.__tablename__}(
 54                id VARCHAR(36) DISTKEY NOT NULL,
 55                create_at VARCHAR(26) NOT NULL,
 56                update_at VARCHAR(26) NOT NULL,
 57                note VARCHAR
 58            )
 59            DISTSTYLE key
 60            COMPOUND SORTKEY(create_at);
 61            """
 62        )
 63        conn.execute(sql)
 64
 65
 66def insert_data(engine: "sa.engine.Engine"):
 67    print(f"Insert some data into {Transaction.__tablename__!r} table")
 68    with orm.Session(engine) as ses:
 69        transaction = Transaction.new(note=f"note {random.randint(1, 1000000)}")
 70        ses.add(transaction)
 71        ses.commit()
 72
 73
 74def select_data(engine: "sa.engine.Engine"):
 75    print(f"select data from {Transaction.__tablename__!r} table")
 76
 77    # return object
 78    print("--- Return object ---")
 79    with orm.Session(engine) as ses:
 80        for transaction in ses.query(Transaction).limit(3):
 81            print(
 82                [
 83                    transaction.id,
 84                    transaction.create_at,
 85                    transaction.update_at,
 86                    transaction.note,
 87                ]
 88            )
 89
 90    # return python dict
 91    print("--- Return dict ---")
 92    with engine.connect() as conn:
 93        for transaction in conn.execute(sa.select(Transaction).limit(3)).mappings():
 94            print(transaction)
 95
 96
 97# load your config
 98dir_here = Path(__file__).absolute().parent
 99path_config_cluster = dir_here / "config-cluster.json"
100config_cluster = rs.Config.load(path_config_cluster)
101
102# create boto session
103boto_ses = boto3.session.Session()
104
105# create sqlalchemy engine
106engine = rs.create_sqlalchemy_engine_for_cluster_using_iam(
107    boto_ses=boto_ses,
108    cluster_id=config_cluster.cluster_id,
109)
110
111rs.test_engine(engine)
112create_table(engine)
113insert_data(engine)
114select_data(engine)
test_work_with_awswrangler.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4This example shows how to use awswrangler to work with Redshift Serverless.
 5"""
 6
 7import boto3
 8from pathlib import Path
 9
10import pandas as pd
11import awswrangler as wr
12
13import aws_redshift_helper.api as rs
14
15
16def create_table_and_load_data():
17    df = pd.DataFrame(
18        [
19            (
20                "d87e0557-447c-4743-a527-cd26b59720b9",
21                "2023-08-09T14:04:58.919920",
22                "2023-08-09T14:04:58.919920",
23                "note 111",
24            ),
25            (
26                "959a2aec-b560-4b1e-9b59-cdca5546f091",
27                "2023-08-09T14:04:58.919920",
28                "2023-08-09T14:04:58.919920",
29                "note 2",
30            ),
31        ],
32        columns=["id", "create_at", "update_at", "note"],
33    )
34    wr.redshift.copy(
35        df=df,
36        path=s3_dir_uri,
37        con=conn,
38        schema="public",
39        table=TABLE_NAME,
40        mode="overwrite",  # append, overwrite or upsert.
41        boto3_session=boto_ses,
42        primary_keys=["id"],
43    )
44
45
46def read_dataframe():
47    print("Read data from Redshift into pandas dataframe")
48    sql = f"SELECT * FROM {TABLE_NAME} LIMIT 10;"
49    df = wr.redshift.read_sql_query(sql, con=conn)
50    print(df)
51
52
53# load your config
54dir_here = Path(__file__).absolute().parent
55path_config_serverless = dir_here / "config-serverless.json"
56config_serverless = pylib.Config.load(path_config_serverless)
57
58# create boto session
59boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
60aws_account_id = boto_ses.client("sts").get_caller_identity()["Account"]
61aws_region = boto_ses.region_name
62
63# create redshift connection
64conn = rs.create_connect_for_serverless_using_iam(
65    boto_ses=boto_ses,
66    workgroup_name=config_serverless.workgroup,
67)
68
69rs.test_connection(conn)
70
71# write data and read data
72bucket = f"{aws_account_id}-{aws_region}-data"
73s3_dir_uri = f"s3://{bucket}/project/redshift-serverless-poc/"
74print(s3_dir_uri)
75TABLE_NAME = "transactions"
76
77create_table_and_load_data()
78read_dataframe()