Redshift Data API#
Keywords: AWS, Amazon, Redshift, Data API
Overview#
Redshift Data API 是 AWS 下的一个服务. 对, 没错, 它居然不是一个功能, 而是一个服务. 它是一套 Rest API 能允许用户使用 AWS CLI 权限直接运行 SQL, 而无需创建数据库连接, 也不没有什么你的网络必须位于同一个 VPC 的要求. 等于是 AWS 帮你部署了一台免费的堡垒机, 然后让你免费用. 这个功能和 AWS RDS Aurora Data API 一样, 非常好用!
在本文中我们对 Data API 进行了测试. 主要是想验证除了能运行 Query, 是不是 Load data, 以及一些 DDL 语句例如 Create Table, Drop Table 之类的是不是也能用 Data API. 这里直接放结论, 对, 可以, 凡是你能运行的 SQL 都可以通过 Data API 使用.
Considerations#
使用 Data API 前你需要知道会有如下考量.
同一时间最多有 200 个 query 处于正在执行中的状态.
query result 最多不超过 100MB. 毕竟结果是要用 HTTP Response 返回的, 不可能太大. 太大的结果请用 UNLOAD.
query statement 的大小不得超过 100KB. 这个可能性很小.
query results 在 24 小时后就会自动删除.
Data API Use Case#
什么情况下使用 Data API 比较好呢?
进行探索性实验. 由于 Data API 设置简单, 适合用于在编程语言中运行 SQL. 如果你只是需要随便写写 SQL, 那么 AWS Console 里的 Query Editor. 而如果你是把 SQL 的输入输出结果做成 Python Code, 那么 Data API 就是一个很好的选择.
在 EC2, Lambda, ECS, Step Function 等计算环境中对 Redshift 进行查询. 并且是你预计查询总时间不会太长 (小于 5 分钟) 的情况下. 如果你的查询总时间可能会很长, 那么建议用 Step Function 来进行长轮询. 因为在 EC2, Lambda, ECS 中做长轮询, 等待的时间也是要计费的. 而 Step Function 是按照 transition 计费.
在 ETL job 中运行一些 LOAD 的命令.
简而言之, 对于高频率, 但是查询时间不长的 Query 用 Data API 进行编程是非常便利的.
VPN Endpoint#
Data API 是一个 service, 你从客户端发起的请求默认是走公网到达 public facing 的 AWS 的 API 服务器, 然后获得数据的. 这个 Data API 跟 DynamoDB 的 API 可以说是一模一样的. 而类似的, 在企业中就会有让客户端和 API 通信之间的信道不走公网的需求. 这个时候就需要用到 VPN Endpoint 了.
Reference:
Connecting to Amazon Redshift using an interface VPC endpoint: 如何为 Redshift 配置 VPN Endpoint 的文档.
The AWS Redshift Helper Library#
使用 Data API 运行 SQL 主要有三个步骤:
execute_statement: 异步执行 SQL
describe_statement: 获取执行的状态, 可能还在 pending 或者正在运行, 或者失败了, 或者成功了. 你在 execute statement 之后要用这个 API 进行长轮询, 这样可以把异步的 API 变成同步的.
get_statement_result: 获取执行结果. 包括 metadata 和数据两部分. 因为数据量可能很大, 所以这个 API 是一个 paginator.
我写了一个 Python 库, 把这三个步骤绑定到了一个函数中, 使得开发者可以轻松的像平时用 DB API 进行数据库编程一样轻松的用 Data API 编写程序.
Sample Usage:
test_redshift_data_api.py
1# -*- coding: utf-8 -*-
2
3import json
4import textwrap
5from pathlib import Path
6from datetime import datetime
7
8import boto3
9import pandas as pd
10import awswrangler as wr
11from s3pathlib import S3Path, context
12from rich import print as rprint
13
14import aws_redshift_helper.api as rs
15
16config = rs.Config.load(Path("config-serverless.json"))
17boto_ses = boto3.session.Session(profile_name=config.aws_profile)
18context.attach_boto_session(boto_ses)
19aws_account_id = boto_ses.client("sts").get_caller_identity()["Account"]
20aws_region = boto_ses.region_name
21rs_sls_client = boto_ses.client("redshift-serverless")
22rs_data_client = boto_ses.client("redshift-data")
23database, _, _ = rs.get_database_by_workgroup(rs_sls_client, config.workgroup)
24bucket = f"{aws_account_id}-{aws_region}-data"
25s3dir_staging = S3Path(
26 f"s3://{bucket}/projects/redshift-serverless-poc/staging/"
27).to_dir()
28s3dir_unload = S3Path(
29 f"s3://{bucket}/projects/redshift-serverless-poc/unload/"
30).to_dir()
31conn = rs.create_connect_for_serverless_using_iam(
32 boto_ses=boto_ses,
33 workgroup_name=config.workgroup,
34)
35T_TRANSACTIONS = "transactions"
36T_JSON_TEST = "json_test"
37
38
39def run_sql(
40 sql: str,
41 no_result: bool = False,
42):
43 """
44 A wrapper to automatically set parameters other than ``sql`` for ``run_sql``
45 function.
46 """
47 columns, rows = rs.run_sql(
48 rs_data_client=rs_data_client,
49 sql=sql,
50 database=database,
51 workgroup_name=config.workgroup,
52 delay=1,
53 timeout=10,
54 no_result=no_result,
55 )
56 rprint(columns, rows)
57
58
59def drop_table():
60 run_sql(sql=f"DROP TABLE IF EXISTS {T_JSON_TEST};", no_result=True)
61
62
63def create_table():
64 sql = textwrap.dedent(
65 f"""
66 CREATE TABLE IF NOT EXISTS {T_JSON_TEST}(
67 id VARCHAR(255) NOT NULL,
68 create_at TIMESTAMP NOT NULL,
69 data SUPER NOT NULL,
70 PRIMARY KEY (id, create_at)
71 )
72 distkey(id)
73 sortkey(create_at);
74 """
75 )
76 run_sql(sql=sql, no_result=True)
77
78
79def delete_table():
80 run_sql(sql=f"DELETE FROM {T_JSON_TEST};", no_result=True)
81
82
83def get_utc_now():
84 return datetime.utcnow()
85
86
87def load_data():
88 df = pd.DataFrame(
89 {
90 "id": ["id-1"],
91 "create_at": [get_utc_now()],
92 # just use dictionary to represent JSON object
93 "data": [
94 {
95 "name": "Alice",
96 "age": 25,
97 "tags": ["cool", "tall", "smart", "beauty"],
98 },
99 ],
100 }
101 )
102 # awswrangler will dump the data to parquet file, parquet is schema self-contained format
103 wr.redshift.copy(
104 df=df,
105 path=s3dir_staging.uri,
106 con=conn,
107 schema="public",
108 table=T_JSON_TEST,
109 mode="append", # append, overwrite or upsert.
110 boto3_session=boto_ses,
111 primary_keys=["id"],
112 # add this option if you have json field and need to load to SUPER data type column
113 serialize_to_json=True,
114 )
115
116
117def select_data():
118 run_sql(sql=f"SELECT * FROM {T_JSON_TEST};")
119
120
121def unload_data():
122 s3dir_unload.delete()
123 sql = f"SELECT * FROM {T_TRANSACTIONS}"
124 final_sql = rs.build_unload_sql(
125 raw_sql=sql,
126 s3_uri=s3dir_unload.uri,
127 format="JSON",
128 )
129 run_sql(sql=final_sql, no_result=True)
130
131 rows = list()
132 for s3path in s3dir_unload.iter_objects():
133 for line in s3path.read_text().split("\n"):
134 if line:
135 rows.append(json.loads(line))
136 rprint(rows)
137
138
139if __name__ == "__main__":
140 """ """
141 # create_table()
142 # drop_table()
143 # delete_table()
144 # load_data()
145 # select_data()
146 # unload_data()