Write Athena Query Results in Parquet, Avro, ORC, JSON formats#

Athena 默认使用 CSV 文件格式来保存 query result. 由于在大数据领域有很多更优秀的数据格式更适合保存数据. 从 2021-08-05 年起, AWS Athena 开始支持 Parquet, Avro, ORC, JSON 等格式. 但是使用这些格式的方法官方文档说的并不清楚. 本文将介绍如何使用 Parquet 格式来保存 Athena 的查询结果 (其他格式都类似).

UNLOAD command in Athena 是一个 SQL 命令, 可以用来指定将 query result 导出到 S3. 而且它支持一些参数, 用来指定导出格式以及这些格式的详细配置.

你在 UNLOAD 的时候需要指定一个还不存在的 S3 folder, 用于保存 parquet 文件. 由于 Athena 是一个并行引擎, 所以会导出多个数据文件. 你在调用 boto3.athena_client.start_query_execution() API 的时候需要指定一个 S3 folder 用来保存 query result metadata. 这个文件夹和之前那个可以相同也可以不同. 但我推荐用不同的, 以方便区分数据和 metadata. 这里面的 metadata 文件中包括一个 ${query_execution_id}-manifest.csv 文件, 记录了导出的所有 parquet 文件的 S3 URI 列表.

所以总结下来, 你需要做这么几件事:

  1. 把原本的 SQL 语句用 UNLOAD 封装. 例如如果原来的语句是 SELECT * FROM table LIMIT 10, 那么封装后的语句就是 UNLOAD (SELECT * FROM table LIMIT 10) TO '{result_s3_folder_uri}' WITH ( format = 'parquet' ).

  2. 调用 start_query_execution() API 来执行查询.

  3. 用 Job Poll 模式每隔几秒就去查一下 execution status, 如果成功了就进行下一步.

  4. 从 metadata 文件中读取 parquet 文件的 S3 URI 列表.

  5. 从 parquet 文件中读取 dataframe 然后拼接成一个.

下面有一个脚本将上面的逻辑封装成了用户友好的函数, 可供参考.

  1# -*- coding: utf-8 -*-
  4This module allow user to use parquet file to store Athena query result.
  5It returns the athena query result as a
  6`polars.LazyFrame <https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html>`_.
 10    polars
 11    s3fs
 12    pyarrow
 14You can run ``pip install polars s3fs pyarrow`` to install them.
 18    from aws_athena_query import (
 19        run_athena_query,
 20        read_athena_query_result,
 21        wait_athena_query_to_succeed,
 22    )
 25import typing as T
 26import uuid
 27import time
 28import textwrap
 30import s3fs
 31import polars as pl
 32import pyarrow.dataset
 36    from boto_session_manager import BotoSesManager
 37    from s3pathlib import S3Path
 40def wait_athena_query_to_succeed(
 41    bsm: "BotoSesManager",
 42    exec_id: str,
 43    delta: int = 1,
 44    timeout: int = 30,
 46    """
 47    Wait a given athena query to reach ``SUCCEEDED`` status. If failed, raise
 48    ``RuntimeError`` immediately. If timeout, raise ``TimeoutError``.
 50    .. versionadded:: 0.11.1
 51    """
 52    # ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/get_query_execution.html
 53    elapsed = 0
 54    for _ in range(999999):
 55        res = bsm.athena_client.get_query_execution(
 56            QueryExecutionId=exec_id,
 57        )
 58        status = res["QueryExecution"]["Status"]["State"]
 59        if status == "SUCCEEDED":
 60            return
 61        elif status in ["FAILED", "CANCELLED"]:
 62            raise RuntimeError(f"execution {exec_id} reached status: {status}")
 63        else:
 64            time.sleep(delta)
 65        elapsed += delta
 66        if elapsed > timeout:
 67            raise TimeoutError(f"athena query timeout in {timeout} seconds!")
 70def _get_dataset_and_metadata_s3path(
 71    s3dir_result: "S3Path",
 72) -> T.Tuple["S3Path", "S3Path"]:
 73    # the dataset folder uri will be used in the UNLOAD command
 74    # the final parquet files will be stored in this folder
 75    # note that this folder has to be NOT EXISTING before the execution
 76    s3dir_dataset = s3dir_result.joinpath("dataset", uuid.uuid4().hex).to_dir()
 78    # the metadata folder will be used to store the query result metadata file
 79    # the metadata file will tell you the list of data file uris
 80    s3dir_metadata = s3dir_result.joinpath("metadata").to_dir()
 81    return s3dir_dataset, s3dir_metadata
 84def read_athena_query_result(
 85    bsm: "BotoSesManager",
 86    s3dir_result: "S3Path",
 87    exec_id: str,
 88    verbose: bool = True,
 89) -> pl.LazyFrame:
 90    """
 91    Load the athena query result from s3. The query has to be succeeded already.
 93    :param bsm: boto_session_manager.BotoSesManager object.
 94    :param s3dir_result: an S3Path object that represent a s3 directory to store
 95        the athena query result.
 96    :param exec_id: athena query execution id, you can get it from the
 97        :func:`run_athena_query` function.
 98    :param verbose: do you want to print the log?
100    :return: the lazy DataFrame of the result, If you just need to return the
101        regular DataFrame, you can do ``df = read_athena_query_result(...).collect()``.
103    .. versionadded:: 0.11.1
104    """
105    s3dir_dataset, s3dir_metadata = _get_dataset_and_metadata_s3path(s3dir_result)
106    # read the manifest file to get list of parquet file uris
107    # ref: https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_pyarrow_dataset.html
108    s3path_manifest = s3dir_metadata.joinpath(f"{exec_id}-manifest.csv")
109    s3uri_list = s3path_manifest.read_text(bsm=bsm).splitlines()
111    if verbose:
112        query_editor_console_url = (
113            f"https://{bsm.aws_region}.console.aws.amazon.com/athena"
114            f"/home?region={bsm.aws_region}#/query-editor/history/{exec_id}"
115        )
116        print(f"preview query in athena editor: {query_editor_console_url}")
117        print(f"query result manifest: {s3path_manifest.console_url}")
118        print(f"query result data: {s3dir_dataset.console_url}")
119        print(f"number of files in result: {len(s3uri_list)}")
121    if isinstance(bsm.profile_name, str):
122        file_system = s3fs.S3FileSystem(profile=bsm.profile_name)
123    else:
124        credential = bsm.boto_ses.get_credentials()
125        file_system = s3fs.S3FileSystem(
126            key=credential.access_key,
127            secret=credential.secret_key,
128            token=credential.token,
129        )
130    dataset = pyarrow.dataset.dataset(s3uri_list, filesystem=file_system)
131    lazy_df = pl.scan_pyarrow_dataset(dataset)
132    df = lazy_df.select(pl.col("*"))
133    return df
136def run_athena_query(
137    bsm: "BotoSesManager",
138    s3dir_result: "S3Path",
139    sql: str,
140    database: str,
141    catalog: T.Optional[str] = None,
142    compression: str = "gzip",
143    encryption_configuration: T.Optional[dict] = None,
144    expected_bucket_owner: T.Optional[str] = None,
145    acl_configuration: T.Optional[dict] = None,
146    workgroup: T.Optional[str] = None,
147    execution_parameters: T.Optional[T.List[T.Any]] = None,
148    result_cache_expire: T.Optional[int] = None,
149    client_request_token: T.Optional[str] = None,
150    delta: int = 1,
151    timeout: int = 10,
152    verbose: bool = True,
153) -> T.Tuple[pl.LazyFrame, str]:
154    """
155    Run athena query and get the result as a polars.LazyFrame.
156    With LazyFrame, you can do further select, filter actions before actually
157    reading the data, and leverage the parquet predicate pushdown feature to
158    reduce the amount of data to be read. If you just need to return the
159    regular DataFrame, you can do ``df = run_athena_query(...)[0].collect()``.
161    Example::
163        >>> from boto_session_manager import BotoSesManager
164        >>> from s3pathlib import S3Path
165        >>>
166        >>> bsm = BotoSesManager(profile_name="your_aws_profile")
167        >>> bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-data"
168        >>> prefix = f"athena/results/"
169        >>> s3dir_result = S3Path(f"s3://{bucket}/{prefix}").to_dir()
171        >>> database = "your_database"
172        >>> sql = f"SELECT * FROM {database}.your_table LIMIT 10;"
173        >>> lazy_df, exec_id = run_athena_query(
174        ...     bsm=bsm,
175        ...     s3dir_result=s3dir_result,
176        ...     sql=sql,
177        ...     database=database,
178        ... )
179        >>> df = lazy_df.collect()
180        >>> df
181        ...
183    start_query_execution doc: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_query_execution.html
185    :param bsm: boto_session_manager.BotoSesManager object.
186    :param s3dir_result: an S3Path object that represent a s3 directory to store
187        the athena query result.
188    :param sql: sql query string.
189    :param database: database name.
190    :param catalog: see start_query_execution doc
191    :param compression: compression algorithm to use when writing parquet file,
192        see all options here: https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html
193    :param compression: see start_query_execution doc
194    :param encryption_configuration: see start_query_execution doc
195    :param expected_bucket_owner: see start_query_execution doc
196    :param acl_configuration: see start_query_execution doc
197    :param workgroup: see start_query_execution doc
198    :param execution_parameters: see start_query_execution doc
199    :param result_cache_expire: cache query result for X minutes, if None,
200        it means no cache, see more information here: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
201        this is very helpful to reduce the cost.
202    :param client_request_token: see start_query_execution doc
203    :param delta: sleep time in seconds between each query status check.
204    :param timeout: timeout in seconds.
205    :param verbose: do you want to print the log?
207    :return: the tuple of two item, the first item is the lazy DataFrame of the result,
208        the second item is the athena query execution id (str)>
210    .. versionadded:: 0.11.1
211    """
212    # the sql query should not end with ;, it will be embedded in the final query
213    sql = sql.strip()
214    if sql.endswith(";"):
215        sql = sql[:-1]
217    s3dir_dataset, s3dir_metadata = _get_dataset_and_metadata_s3path(s3dir_result)
219    # ref: https://docs.aws.amazon.com/athena/latest/ug/unload.html
220    # use UNLOAD command to write result into data format other than csv
221    final_sql = textwrap.dedent(
222        f"""
223        UNLOAD ({sql})
224        TO '{s3dir_dataset.uri}'
225        WITH ( format = 'parquet', compression= '{compression}')
226        """
227    )
229    # ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_query_execution.html
230    # you may need to customize this for more advanced query,
231    # like cross database query, federated query
232    query_execution_context = dict(Database=database)
233    if catalog:
234        query_execution_context["Catalog"] = catalog
235    result_configuration = dict(OutputLocation=s3dir_metadata.uri)
236    if encryption_configuration:
237        result_configuration["EncryptionConfiguration"] = encryption_configuration
238    if expected_bucket_owner:
239        result_configuration["ExpectedBucketOwner"] = expected_bucket_owner
240    if acl_configuration:
241        result_configuration["AclConfiguration"] = acl_configuration
242    kwargs = dict(
243        QueryString=final_sql,
244        QueryExecutionContext=query_execution_context,
245        ResultConfiguration=dict(
246            OutputLocation=s3dir_metadata.uri,
247        ),
248    )
249    if workgroup:
250        kwargs["WorkGroup"] = workgroup
251    if execution_parameters:
252        kwargs["Parameters"] = execution_parameters
253    if result_cache_expire:
254        kwargs["ResultReuseConfiguration"] = dict(
255            ResultReuseByAgeConfiguration=dict(
256                Enabled=True,
257                MaxAgeInMinutes=result_cache_expire,
258            )
259        )
260    if client_request_token:
261        kwargs["ClientRequestToken"] = client_request_token
262    res = bsm.athena_client.start_query_execution(**kwargs)
264    # the start_query_execution API is async, it returns the execution id
265    exec_id = res["QueryExecutionId"]
267    # wait for the execution to finish
268    wait_athena_query_to_succeed(bsm=bsm, exec_id=exec_id, delta=delta, timeout=timeout)
269    lazy_df = read_athena_query_result(
270        bsm=bsm,
271        s3dir_result=s3dir_result,
272        exec_id=exec_id,
273        verbose=verbose,
274    )
275    return lazy_df, exec_id
278if __name__ == "__main__":
279    import random
280    import textwrap
282    from boto_session_manager import BotoSesManager
283    from s3pathlib import S3Path
285    bsm = BotoSesManager()
287    bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-data"
288    prefix = f"athena/results/"
289    s3dir_result = S3Path(f"s3://{bucket}/{prefix}").to_dir()
291    database = "dynamodb_to_datalake"
292    table = "transactions"
294    n = random.randint(100, 500)
295    sql = textwrap.dedent(
296        f"""
297        SELECT * 
298        FROM "{database}"."{table}"
299        LIMIT {n};
300    """
301    )
303    lazy_df, exec_id = run_athena_query(
304        bsm=bsm,
305        s3dir_result=s3dir_result,
306        sql=sql,
307        database=database,
308    )
309    df = lazy_df.collect()
310    print(df.shape)
311    print(df)
312    assert df.shape[0] == n


