Redshift Data API#

Keywords: AWS, Amazon, Redshift, Data API

Overview#

Redshift Data API 是 AWS 下的一个服务. 对, 没错, 它居然不是一个功能, 而是一个服务. 它是一套 Rest API 能允许用户使用 AWS CLI 权限直接运行 SQL, 而无需创建数据库连接, 也不没有什么你的网络必须位于同一个 VPC 的要求. 等于是 AWS 帮你部署了一台免费的堡垒机, 然后让你免费用. 这个功能和 AWS RDS Aurora Data API 一样, 非常好用!

在本文中我们对 Data API 进行了测试. 主要是想验证除了能运行 Query, 是不是 Load data, 以及一些 DDL 语句例如 Create Table, Drop Table 之类的是不是也能用 Data API. 这里直接放结论, 对, 可以, 凡是你能运行的 SQL 都可以通过 Data API 使用.

Considerations#

使用 Data API 前你需要知道会有如下考量.

  • 同一时间最多有 200 个 query 处于正在执行中的状态.

  • query result 最多不超过 100MB. 毕竟结果是要用 HTTP Response 返回的, 不可能太大. 太大的结果请用 UNLOAD.

  • query statement 的大小不得超过 100KB. 这个可能性很小.

  • query results 在 24 小时后就会自动删除.

Data API Use Case#

什么情况下使用 Data API 比较好呢?

  1. 进行探索性实验. 由于 Data API 设置简单, 适合用于在编程语言中运行 SQL. 如果你只是需要随便写写 SQL, 那么 AWS Console 里的 Query Editor. 而如果你是把 SQL 的输入输出结果做成 Python Code, 那么 Data API 就是一个很好的选择.

  2. 在 EC2, Lambda, ECS, Step Function 等计算环境中对 Redshift 进行查询. 并且是你预计查询总时间不会太长 (小于 5 分钟) 的情况下. 如果你的查询总时间可能会很长, 那么建议用 Step Function 来进行长轮询. 因为在 EC2, Lambda, ECS 中做长轮询, 等待的时间也是要计费的. 而 Step Function 是按照 transition 计费.

  3. 在 ETL job 中运行一些 LOAD 的命令.

简而言之, 对于高频率, 但是查询时间不长的 Query 用 Data API 进行编程是非常便利的.

VPN Endpoint#

Data API 是一个 service, 你从客户端发起的请求默认是走公网到达 public facing 的 AWS 的 API 服务器, 然后获得数据的. 这个 Data API 跟 DynamoDB 的 API 可以说是一模一样的. 而类似的, 在企业中就会有让客户端和 API 通信之间的信道不走公网的需求. 这个时候就需要用到 VPN Endpoint 了.

Reference:

The AWS Redshift Helper Library#

使用 Data API 运行 SQL 主要有三个步骤:

  • execute_statement: 异步执行 SQL

  • describe_statement: 获取执行的状态, 可能还在 pending 或者正在运行, 或者失败了, 或者成功了. 你在 execute statement 之后要用这个 API 进行长轮询, 这样可以把异步的 API 变成同步的.

  • get_statement_result: 获取执行结果. 包括 metadata 和数据两部分. 因为数据量可能很大, 所以这个 API 是一个 paginator.

我写了一个 Python 库, 把这三个步骤绑定到了一个函数中, 使得开发者可以轻松的像平时用 DB API 进行数据库编程一样轻松的用 Data API 编写程序.

Sample Usage:

test_redshift_data_api.py
  1# -*- coding: utf-8 -*-
  2
  3import json
  4import textwrap
  5from pathlib import Path
  6from datetime import datetime
  7
  8import boto3
  9import pandas as pd
 10import awswrangler as wr
 11from s3pathlib import S3Path, context
 12from rich import print as rprint
 13
 14import aws_redshift_helper.api as rs
 15
 16config = rs.Config.load(Path("config-serverless.json"))
 17boto_ses = boto3.session.Session(profile_name=config.aws_profile)
 18context.attach_boto_session(boto_ses)
 19aws_account_id = boto_ses.client("sts").get_caller_identity()["Account"]
 20aws_region = boto_ses.region_name
 21rs_sls_client = boto_ses.client("redshift-serverless")
 22rs_data_client = boto_ses.client("redshift-data")
 23database, _, _ = rs.get_database_by_workgroup(rs_sls_client, config.workgroup)
 24bucket = f"{aws_account_id}-{aws_region}-data"
 25s3dir_staging = S3Path(
 26    f"s3://{bucket}/projects/redshift-serverless-poc/staging/"
 27).to_dir()
 28s3dir_unload = S3Path(
 29    f"s3://{bucket}/projects/redshift-serverless-poc/unload/"
 30).to_dir()
 31conn = rs.create_connect_for_serverless_using_iam(
 32    boto_ses=boto_ses,
 33    workgroup_name=config.workgroup,
 34)
 35T_TRANSACTIONS = "transactions"
 36T_JSON_TEST = "json_test"
 37
 38
 39def run_sql(
 40    sql: str,
 41    no_result: bool = False,
 42):
 43    """
 44    A wrapper to automatically set parameters other than ``sql`` for ``run_sql``
 45    function.
 46    """
 47    columns, rows = rs.run_sql(
 48        rs_data_client=rs_data_client,
 49        sql=sql,
 50        database=database,
 51        workgroup_name=config.workgroup,
 52        delay=1,
 53        timeout=10,
 54        no_result=no_result,
 55    )
 56    rprint(columns, rows)
 57
 58
 59def drop_table():
 60    run_sql(sql=f"DROP TABLE IF EXISTS {T_JSON_TEST};", no_result=True)
 61
 62
 63def create_table():
 64    sql = textwrap.dedent(
 65        f"""
 66    CREATE TABLE IF NOT EXISTS {T_JSON_TEST}(
 67        id VARCHAR(255) NOT NULL,
 68        create_at TIMESTAMP NOT NULL,
 69        data SUPER NOT NULL,
 70        PRIMARY KEY (id, create_at)
 71    )
 72    distkey(id)
 73    sortkey(create_at);
 74    """
 75    )
 76    run_sql(sql=sql, no_result=True)
 77
 78
 79def delete_table():
 80    run_sql(sql=f"DELETE FROM {T_JSON_TEST};", no_result=True)
 81
 82
 83def get_utc_now():
 84    return datetime.utcnow()
 85
 86
 87def load_data():
 88    df = pd.DataFrame(
 89        {
 90            "id": ["id-1"],
 91            "create_at": [get_utc_now()],
 92            # just use dictionary to represent JSON object
 93            "data": [
 94                {
 95                    "name": "Alice",
 96                    "age": 25,
 97                    "tags": ["cool", "tall", "smart", "beauty"],
 98                },
 99            ],
100        }
101    )
102    # awswrangler will dump the data to parquet file, parquet is schema self-contained format
103    wr.redshift.copy(
104        df=df,
105        path=s3dir_staging.uri,
106        con=conn,
107        schema="public",
108        table=T_JSON_TEST,
109        mode="append",  # append, overwrite or upsert.
110        boto3_session=boto_ses,
111        primary_keys=["id"],
112        # add this option if you have json field and need to load to SUPER data type column
113        serialize_to_json=True,
114    )
115
116
117def select_data():
118    run_sql(sql=f"SELECT * FROM {T_JSON_TEST};")
119
120
121def unload_data():
122    s3dir_unload.delete()
123    sql = f"SELECT * FROM {T_TRANSACTIONS}"
124    final_sql = rs.build_unload_sql(
125        raw_sql=sql,
126        s3_uri=s3dir_unload.uri,
127        format="JSON",
128    )
129    run_sql(sql=final_sql, no_result=True)
130
131    rows = list()
132    for s3path in s3dir_unload.iter_objects():
133        for line in s3path.read_text().split("\n"):
134            if line:
135                rows.append(json.loads(line))
136    rprint(rows)
137
138
139if __name__ == "__main__":
140    """ """
141    # create_table()
142    # drop_table()
143    # delete_table()
144    # load_data()
145    # select_data()
146    # unload_data()

Reference#