Connect To Redshift in Python#
Keywords: AWS, Amazon, Redshift, Connect
How it Works#
首先我们来了解一下用 Python 跟 Redshift 通信的原理.
首先我们回顾一下用 GUI 工具, 连接到数据库的原理. 我们以 DBeaver (免费的数据库 GUI) 为例. 一般对于不同的数据库你需要一个 Driver, Driver 定义了跟特定数据库通信的接口. 而 Redshift 本质上是一个数据库, 它当然也需要一个 Driver. 如果你用 DBeaver 连接 Redshift, 你会看到提示让你下载 Driver.
我们再来看用 Python 连接到数据库的原理. Python 中有一个标准, PEP 249 – Python Database API Specification v2.0, 如果你要用 Python 实现一个数据库 Driver, 你就必须符合这个标准. 这使得不同的数据库的 Driver 的 API 接口都是类似的. 而 Redshift 是一个数据库, 当然 Redshift Python Driver 也得符合这个标准.
AWS 官方维护着一个叫 redshift_connector 的 Python 库. 这个库就是官方的 Redshift Driver. 你看到的任何第三方库, 例如 awswrangler, sqlalchemy-redshift, 本质上都是对 redshift-connector 所创建的 connection 的封装.
Use Username Password or IAM#
AWS 提供了两种鉴权方式:
在数据库中创建 Database User (以及 password), 然后用 host, port, database, user, pass 的方式创建 DBApi2.0 connection.
- 使用 IAM Role, 本质上是用你的 IAM Principal 创建临时的 User / Pass (过一段时间后会失效), 然后创建 DBApi2.0 connection. 有这么几个 API 可以获得临时的 credential:
redshift_client.get_cluster_credentials: 这是给 Provisioned cluster 模式用的. 这个 API 需要显式创建一个临时的 DB user / password.
redshift_client.get_cluster_credentials_with_iam: 这是给 Provisioned cluster 模式用的, 你无法指定 DB user, user 是跟你的 IAM Principal 1:1 绑定的.
redshift_serverless_client.get_credentials: 这是给 Serverless 模式用的, 你无法指定 DB user, user 是跟你的 IAM Principal 1:1 绑定的.
在 Redshift 产品早期, 你只能用 #1. 这就涉及到你需要用 Secret Manager 来保存密码并定期用 Lambda Rotate. 这比较的麻烦. 目前 (2024 年) 基本上你可以无脑使用 #2 的方式. 临时的 Credential 永远会更安全. 不过要注意的是 #2 用的是临时 Credential, 最大持续时间是 1 小时. 对于长时间运行的 App 你可能需要 Refresh Credential. 不过这个很好实现.
Redshift Data API#
Data API 是云原生产品的一大优势. AWS 允许你用 Rest API 异步执行 Query, 而无需创建连接. 这种做法超级超级方便. 非常适合用来写长期运行的 App 程序, 或是将其封装为 App 给最终用户使用.
Trouble Shoot#
注意你的 Cluster 或 WorkGroup 的 Security Group 白名单里有你的 IP 地址.
如果你的网络不是在 VPC 中的, 注意你的 Cluster 是否开启了 Public Access.
Sample Code#
这里我把用多种方法连接到 Redshift 并进行简单的 CRUD 操作的代码封装成了一个简单的库, 方便以后调用.
test_create_connect_for_serverless_using_iam.py
1# -*- coding: utf-8 -*-
2
3"""
4This example shows how to connect to Redshift Serverless using redshift connector.
5"""
6
7import boto3
8from pathlib import Path
9
10import aws_redshift_helper.api as rs
11
12# load your config
13dir_here = Path(__file__).absolute().parent
14path_config_serverless = dir_here / "config-serverless.json"
15config_serverless = rs.Config.load(path_config_serverless)
16
17# create boto session
18boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
19
20# create redshift connection
21conn = rs.create_connect_for_serverless_using_iam(
22 boto_ses=boto_ses,
23 workgroup_name=config_serverless.workgroup,
24)
25
26rs.test_connection(conn)
test_create_sqlalchemy_engine_for_serverless_using_iam.py
1# -*- coding: utf-8 -*-
2
3"""
4This example shows how to connect to Redshift Serverless using Sqlalchemy.
5"""
6
7import boto3
8import uuid
9import random
10import textwrap
11from pathlib import Path
12from datetime import datetime
13
14import sqlalchemy as sa
15import sqlalchemy.orm as orm
16
17import aws_redshift_helper.api as rs
18
19
20Base = orm.declarative_base()
21
22
23class Transaction(Base):
24 __tablename__ = "transactions"
25
26 id: str = sa.Column(sa.String, primary_key=True)
27 create_at: str = sa.Column(sa.String)
28 update_at: str = sa.Column(sa.String)
29 note: str = sa.Column(sa.String, nullable=True)
30
31 @classmethod
32 def new(cls, note: str = None):
33 return cls(
34 id=str(uuid.uuid4()),
35 create_at=datetime.utcnow().isoformat(),
36 update_at=datetime.utcnow().isoformat(),
37 note=note,
38 )
39
40
41def create_table(engine: "sa.engine.Engine"):
42 with engine.connect() as conn:
43 sql = textwrap.dedent(
44 f"""
45 DROP TABLE IF EXISTS {Transaction.__tablename__};
46 """
47 )
48 conn.execute(sql)
49
50 with engine.connect() as conn:
51 sql = textwrap.dedent(
52 f"""
53 CREATE TABLE {Transaction.__tablename__}(
54 id VARCHAR(36) DISTKEY NOT NULL,
55 create_at VARCHAR(26) NOT NULL,
56 update_at VARCHAR(26) NOT NULL,
57 note VARCHAR
58 )
59 DISTSTYLE key
60 COMPOUND SORTKEY(create_at);
61 """
62 )
63 conn.execute(sql)
64
65
66def insert_data(engine: "sa.engine.Engine"):
67 print(f"Insert some data into {Transaction.__tablename__!r} table")
68 with orm.Session(engine) as ses:
69 transaction = Transaction.new(note=f"note {random.randint(1, 1000000)}")
70 ses.add(transaction)
71 ses.commit()
72
73
74def select_data(engine: "sa.engine.Engine"):
75 print(f"select data from {Transaction.__tablename__!r} table")
76
77 # return object
78 print("--- Return object ---")
79 with orm.Session(engine) as ses:
80 for transaction in ses.query(Transaction).limit(3):
81 print(
82 [
83 transaction.id,
84 transaction.create_at,
85 transaction.update_at,
86 transaction.note,
87 ]
88 )
89
90 # return python dict
91 print("--- Return dict ---")
92 with engine.connect() as conn:
93 for transaction in conn.execute(sa.select(Transaction).limit(3)).mappings():
94 print(transaction)
95
96
97# load your config
98dir_here = Path(__file__).absolute().parent
99path_config_serverless = dir_here / "config-serverless.json"
100config_serverless = rs.Config.load(path_config_serverless)
101
102# create boto session
103boto_ses = boto3.session.Session()
104
105# create sqlalchemy engine
106engine = rs.create_sqlalchemy_engine_for_serverless_using_iam(
107 boto_ses=boto_ses,
108 workgroup_name=config_serverless.workgroup,
109)
110
111rs.test_engine(engine)
112create_table(engine)
113insert_data(engine)
114select_data(engine)
test_create_connect_for_cluster_using_iam.py
1# -*- coding: utf-8 -*-
2
3"""
4This example shows how to connect to Redshift Serverless using redshift connector.
5"""
6
7import boto3
8from pathlib import Path
9
10import aws_redshift_helper.api as rs
11
12# load your config
13dir_here = Path(__file__).absolute().parent
14path_config_cluster = dir_here / "config-cluster.json"
15config_cluster = rs.Config.load(path_config_cluster)
16
17# create boto session
18boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
19
20# create redshift connection
21conn = rs.create_connect_for_cluster_using_iam(
22 boto_ses=boto_ses,
23 cluster_id=config_cluster.cluster_id,
24)
25
26rs.test_connection(conn)
test_create_sqlalchemy_engine_for_cluster_using_iam.py
1# -*- coding: utf-8 -*-
2
3"""
4This example shows how to connect to Redshift cluster using Sqlalchemy.
5"""
6
7import boto3
8import uuid
9import random
10import textwrap
11from pathlib import Path
12from datetime import datetime
13
14import sqlalchemy as sa
15import sqlalchemy.orm as orm
16
17import aws_redshift_helper.api as rs
18
19
20Base = orm.declarative_base()
21
22
23class Transaction(Base):
24 __tablename__ = "transactions"
25
26 id: str = sa.Column(sa.String, primary_key=True)
27 create_at: str = sa.Column(sa.String)
28 update_at: str = sa.Column(sa.String)
29 note: str = sa.Column(sa.String, nullable=True)
30
31 @classmethod
32 def new(cls, note: str = None):
33 return cls(
34 id=str(uuid.uuid4()),
35 create_at=datetime.utcnow().isoformat(),
36 update_at=datetime.utcnow().isoformat(),
37 note=note,
38 )
39
40
41def create_table(engine: "sa.engine.Engine"):
42 with engine.connect() as conn:
43 sql = textwrap.dedent(
44 f"""
45 DROP TABLE IF EXISTS {Transaction.__tablename__};
46 """
47 )
48 conn.execute(sql)
49
50 with engine.connect() as conn:
51 sql = textwrap.dedent(
52 f"""
53 CREATE TABLE {Transaction.__tablename__}(
54 id VARCHAR(36) DISTKEY NOT NULL,
55 create_at VARCHAR(26) NOT NULL,
56 update_at VARCHAR(26) NOT NULL,
57 note VARCHAR
58 )
59 DISTSTYLE key
60 COMPOUND SORTKEY(create_at);
61 """
62 )
63 conn.execute(sql)
64
65
66def insert_data(engine: "sa.engine.Engine"):
67 print(f"Insert some data into {Transaction.__tablename__!r} table")
68 with orm.Session(engine) as ses:
69 transaction = Transaction.new(note=f"note {random.randint(1, 1000000)}")
70 ses.add(transaction)
71 ses.commit()
72
73
74def select_data(engine: "sa.engine.Engine"):
75 print(f"select data from {Transaction.__tablename__!r} table")
76
77 # return object
78 print("--- Return object ---")
79 with orm.Session(engine) as ses:
80 for transaction in ses.query(Transaction).limit(3):
81 print(
82 [
83 transaction.id,
84 transaction.create_at,
85 transaction.update_at,
86 transaction.note,
87 ]
88 )
89
90 # return python dict
91 print("--- Return dict ---")
92 with engine.connect() as conn:
93 for transaction in conn.execute(sa.select(Transaction).limit(3)).mappings():
94 print(transaction)
95
96
97# load your config
98dir_here = Path(__file__).absolute().parent
99path_config_cluster = dir_here / "config-cluster.json"
100config_cluster = rs.Config.load(path_config_cluster)
101
102# create boto session
103boto_ses = boto3.session.Session()
104
105# create sqlalchemy engine
106engine = rs.create_sqlalchemy_engine_for_cluster_using_iam(
107 boto_ses=boto_ses,
108 cluster_id=config_cluster.cluster_id,
109)
110
111rs.test_engine(engine)
112create_table(engine)
113insert_data(engine)
114select_data(engine)
test_work_with_awswrangler.py
1# -*- coding: utf-8 -*-
2
3"""
4This example shows how to use awswrangler to work with Redshift Serverless.
5"""
6
7import boto3
8from pathlib import Path
9
10import pandas as pd
11import awswrangler as wr
12
13import aws_redshift_helper.api as rs
14
15
16def create_table_and_load_data():
17 df = pd.DataFrame(
18 [
19 (
20 "d87e0557-447c-4743-a527-cd26b59720b9",
21 "2023-08-09T14:04:58.919920",
22 "2023-08-09T14:04:58.919920",
23 "note 111",
24 ),
25 (
26 "959a2aec-b560-4b1e-9b59-cdca5546f091",
27 "2023-08-09T14:04:58.919920",
28 "2023-08-09T14:04:58.919920",
29 "note 2",
30 ),
31 ],
32 columns=["id", "create_at", "update_at", "note"],
33 )
34 wr.redshift.copy(
35 df=df,
36 path=s3_dir_uri,
37 con=conn,
38 schema="public",
39 table=TABLE_NAME,
40 mode="overwrite", # append, overwrite or upsert.
41 boto3_session=boto_ses,
42 primary_keys=["id"],
43 )
44
45
46def read_dataframe():
47 print("Read data from Redshift into pandas dataframe")
48 sql = f"SELECT * FROM {TABLE_NAME} LIMIT 10;"
49 df = wr.redshift.read_sql_query(sql, con=conn)
50 print(df)
51
52
53# load your config
54dir_here = Path(__file__).absolute().parent
55path_config_serverless = dir_here / "config-serverless.json"
56config_serverless = pylib.Config.load(path_config_serverless)
57
58# create boto session
59boto_ses = boto3.session.Session(profile_name="awshsh_app_dev_us_east_1")
60aws_account_id = boto_ses.client("sts").get_caller_identity()["Account"]
61aws_region = boto_ses.region_name
62
63# create redshift connection
64conn = rs.create_connect_for_serverless_using_iam(
65 boto_ses=boto_ses,
66 workgroup_name=config_serverless.workgroup,
67)
68
69rs.test_connection(conn)
70
71# write data and read data
72bucket = f"{aws_account_id}-{aws_region}-data"
73s3_dir_uri = f"s3://{bucket}/project/redshift-serverless-poc/"
74print(s3_dir_uri)
75TABLE_NAME = "transactions"
76
77create_table_and_load_data()
78read_dataframe()