Write Athena Query Results in Parquet, Avro, ORC, JSON formats#
Keywords: AWS, Athena, Parquet, Avro, ORC, JSON, S3
Introduction#
Athena 默认使用 CSV 文件格式来保存 query result. 由于在大数据领域有很多更优秀的数据格式更适合保存数据. 从 2021-08-05 年起, AWS Athena 开始支持 Parquet, Avro, ORC, JSON 等格式. 但是使用这些格式的方法官方文档说的并不清楚. 本文将介绍如何使用 Parquet 格式来保存 Athena 的查询结果 (其他格式都类似).
UNLOAD command in Athena 是一个 SQL 命令, 可以用来指定将 query result 导出到 S3. 而且它支持一些参数, 用来指定导出格式以及这些格式的详细配置.
你在 UNLOAD 的时候需要指定一个还不存在的 S3 folder, 用于保存 parquet 文件. 由于 Athena 是一个并行引擎, 所以会导出多个数据文件. 你在调用 boto3.athena_client.start_query_execution()
API 的时候需要指定一个 S3 folder 用来保存 query result metadata. 这个文件夹和之前那个可以相同也可以不同. 但我推荐用不同的, 以方便区分数据和 metadata. 这里面的 metadata 文件中包括一个 ${query_execution_id}-manifest.csv
文件, 记录了导出的所有 parquet 文件的 S3 URI 列表.
所以总结下来, 你需要做这么几件事:
把原本的 SQL 语句用 UNLOAD 封装. 例如如果原来的语句是
SELECT * FROM table LIMIT 10
, 那么封装后的语句就是UNLOAD (SELECT * FROM table LIMIT 10) TO '{result_s3_folder_uri}' WITH ( format = 'parquet' )
.调用
start_query_execution()
API 来执行查询.用 Job Poll 模式每隔几秒就去查一下 execution status, 如果成功了就进行下一步.
从 metadata 文件中读取 parquet 文件的 S3 URI 列表.
从 parquet 文件中读取 dataframe 然后拼接成一个.
下面有一个脚本将上面的逻辑封装成了用户友好的函数, 可供参考.
1# -*- coding: utf-8 -*-
2
3"""
4This module allow user to use parquet file to store Athena query result.
5It returns the athena query result as a
6`polars.LazyFrame <https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html>`_.
7
8Requirements:
9
10 polars
11 s3fs
12 pyarrow
13
14You can run ``pip install polars s3fs pyarrow`` to install them.
15
16Usage::
17
18 from aws_athena_query import (
19 run_athena_query,
20 read_athena_query_result,
21 wait_athena_query_to_succeed,
22 )
23"""
24
25import typing as T
26import uuid
27import time
28import textwrap
29
30import s3fs
31import polars as pl
32import pyarrow.dataset
33
34
35if T.TYPE_CHECKING:
36 from boto_session_manager import BotoSesManager
37 from s3pathlib import S3Path
38
39
40def wait_athena_query_to_succeed(
41 bsm: "BotoSesManager",
42 exec_id: str,
43 delta: int = 1,
44 timeout: int = 30,
45):
46 """
47 Wait a given athena query to reach ``SUCCEEDED`` status. If failed, raise
48 ``RuntimeError`` immediately. If timeout, raise ``TimeoutError``.
49
50 .. versionadded:: 0.11.1
51 """
52 # ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/get_query_execution.html
53 elapsed = 0
54 for _ in range(999999):
55 res = bsm.athena_client.get_query_execution(
56 QueryExecutionId=exec_id,
57 )
58 status = res["QueryExecution"]["Status"]["State"]
59 if status == "SUCCEEDED":
60 return
61 elif status in ["FAILED", "CANCELLED"]:
62 raise RuntimeError(f"execution {exec_id} reached status: {status}")
63 else:
64 time.sleep(delta)
65 elapsed += delta
66 if elapsed > timeout:
67 raise TimeoutError(f"athena query timeout in {timeout} seconds!")
68
69
70def _get_dataset_and_metadata_s3path(
71 s3dir_result: "S3Path",
72) -> T.Tuple["S3Path", "S3Path"]:
73 # the dataset folder uri will be used in the UNLOAD command
74 # the final parquet files will be stored in this folder
75 # note that this folder has to be NOT EXISTING before the execution
76 s3dir_dataset = s3dir_result.joinpath("dataset", uuid.uuid4().hex).to_dir()
77
78 # the metadata folder will be used to store the query result metadata file
79 # the metadata file will tell you the list of data file uris
80 s3dir_metadata = s3dir_result.joinpath("metadata").to_dir()
81 return s3dir_dataset, s3dir_metadata
82
83
84def read_athena_query_result(
85 bsm: "BotoSesManager",
86 s3dir_result: "S3Path",
87 exec_id: str,
88 verbose: bool = True,
89) -> pl.LazyFrame:
90 """
91 Load the athena query result from s3. The query has to be succeeded already.
92
93 :param bsm: boto_session_manager.BotoSesManager object.
94 :param s3dir_result: an S3Path object that represent a s3 directory to store
95 the athena query result.
96 :param exec_id: athena query execution id, you can get it from the
97 :func:`run_athena_query` function.
98 :param verbose: do you want to print the log?
99
100 :return: the lazy DataFrame of the result, If you just need to return the
101 regular DataFrame, you can do ``df = read_athena_query_result(...).collect()``.
102
103 .. versionadded:: 0.11.1
104 """
105 s3dir_dataset, s3dir_metadata = _get_dataset_and_metadata_s3path(s3dir_result)
106 # read the manifest file to get list of parquet file uris
107 # ref: https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_pyarrow_dataset.html
108 s3path_manifest = s3dir_metadata.joinpath(f"{exec_id}-manifest.csv")
109 s3uri_list = s3path_manifest.read_text(bsm=bsm).splitlines()
110
111 if verbose:
112 query_editor_console_url = (
113 f"https://{bsm.aws_region}.console.aws.amazon.com/athena"
114 f"/home?region={bsm.aws_region}#/query-editor/history/{exec_id}"
115 )
116 print(f"preview query in athena editor: {query_editor_console_url}")
117 print(f"query result manifest: {s3path_manifest.console_url}")
118 print(f"query result data: {s3dir_dataset.console_url}")
119 print(f"number of files in result: {len(s3uri_list)}")
120
121 if isinstance(bsm.profile_name, str):
122 file_system = s3fs.S3FileSystem(profile=bsm.profile_name)
123 else:
124 credential = bsm.boto_ses.get_credentials()
125 file_system = s3fs.S3FileSystem(
126 key=credential.access_key,
127 secret=credential.secret_key,
128 token=credential.token,
129 )
130 dataset = pyarrow.dataset.dataset(s3uri_list, filesystem=file_system)
131 lazy_df = pl.scan_pyarrow_dataset(dataset)
132 df = lazy_df.select(pl.col("*"))
133 return df
134
135
136def run_athena_query(
137 bsm: "BotoSesManager",
138 s3dir_result: "S3Path",
139 sql: str,
140 database: str,
141 catalog: T.Optional[str] = None,
142 compression: str = "gzip",
143 encryption_configuration: T.Optional[dict] = None,
144 expected_bucket_owner: T.Optional[str] = None,
145 acl_configuration: T.Optional[dict] = None,
146 workgroup: T.Optional[str] = None,
147 execution_parameters: T.Optional[T.List[T.Any]] = None,
148 result_cache_expire: T.Optional[int] = None,
149 client_request_token: T.Optional[str] = None,
150 delta: int = 1,
151 timeout: int = 10,
152 verbose: bool = True,
153) -> T.Tuple[pl.LazyFrame, str]:
154 """
155 Run athena query and get the result as a polars.LazyFrame.
156 With LazyFrame, you can do further select, filter actions before actually
157 reading the data, and leverage the parquet predicate pushdown feature to
158 reduce the amount of data to be read. If you just need to return the
159 regular DataFrame, you can do ``df = run_athena_query(...)[0].collect()``.
160
161 Example::
162
163 >>> from boto_session_manager import BotoSesManager
164 >>> from s3pathlib import S3Path
165 >>>
166 >>> bsm = BotoSesManager(profile_name="your_aws_profile")
167 >>> bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-data"
168 >>> prefix = f"athena/results/"
169 >>> s3dir_result = S3Path(f"s3://{bucket}/{prefix}").to_dir()
170
171 >>> database = "your_database"
172 >>> sql = f"SELECT * FROM {database}.your_table LIMIT 10;"
173 >>> lazy_df, exec_id = run_athena_query(
174 ... bsm=bsm,
175 ... s3dir_result=s3dir_result,
176 ... sql=sql,
177 ... database=database,
178 ... )
179 >>> df = lazy_df.collect()
180 >>> df
181 ...
182
183 start_query_execution doc: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_query_execution.html
184
185 :param bsm: boto_session_manager.BotoSesManager object.
186 :param s3dir_result: an S3Path object that represent a s3 directory to store
187 the athena query result.
188 :param sql: sql query string.
189 :param database: database name.
190 :param catalog: see start_query_execution doc
191 :param compression: compression algorithm to use when writing parquet file,
192 see all options here: https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html
193 :param compression: see start_query_execution doc
194 :param encryption_configuration: see start_query_execution doc
195 :param expected_bucket_owner: see start_query_execution doc
196 :param acl_configuration: see start_query_execution doc
197 :param workgroup: see start_query_execution doc
198 :param execution_parameters: see start_query_execution doc
199 :param result_cache_expire: cache query result for X minutes, if None,
200 it means no cache, see more information here: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
201 this is very helpful to reduce the cost.
202 :param client_request_token: see start_query_execution doc
203 :param delta: sleep time in seconds between each query status check.
204 :param timeout: timeout in seconds.
205 :param verbose: do you want to print the log?
206
207 :return: the tuple of two item, the first item is the lazy DataFrame of the result,
208 the second item is the athena query execution id (str)>
209
210 .. versionadded:: 0.11.1
211 """
212 # the sql query should not end with ;, it will be embedded in the final query
213 sql = sql.strip()
214 if sql.endswith(";"):
215 sql = sql[:-1]
216
217 s3dir_dataset, s3dir_metadata = _get_dataset_and_metadata_s3path(s3dir_result)
218
219 # ref: https://docs.aws.amazon.com/athena/latest/ug/unload.html
220 # use UNLOAD command to write result into data format other than csv
221 final_sql = textwrap.dedent(
222 f"""
223 UNLOAD ({sql})
224 TO '{s3dir_dataset.uri}'
225 WITH ( format = 'parquet', compression= '{compression}')
226 """
227 )
228
229 # ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_query_execution.html
230 # you may need to customize this for more advanced query,
231 # like cross database query, federated query
232 query_execution_context = dict(Database=database)
233 if catalog:
234 query_execution_context["Catalog"] = catalog
235 result_configuration = dict(OutputLocation=s3dir_metadata.uri)
236 if encryption_configuration:
237 result_configuration["EncryptionConfiguration"] = encryption_configuration
238 if expected_bucket_owner:
239 result_configuration["ExpectedBucketOwner"] = expected_bucket_owner
240 if acl_configuration:
241 result_configuration["AclConfiguration"] = acl_configuration
242 kwargs = dict(
243 QueryString=final_sql,
244 QueryExecutionContext=query_execution_context,
245 ResultConfiguration=dict(
246 OutputLocation=s3dir_metadata.uri,
247 ),
248 )
249 if workgroup:
250 kwargs["WorkGroup"] = workgroup
251 if execution_parameters:
252 kwargs["Parameters"] = execution_parameters
253 if result_cache_expire:
254 kwargs["ResultReuseConfiguration"] = dict(
255 ResultReuseByAgeConfiguration=dict(
256 Enabled=True,
257 MaxAgeInMinutes=result_cache_expire,
258 )
259 )
260 if client_request_token:
261 kwargs["ClientRequestToken"] = client_request_token
262 res = bsm.athena_client.start_query_execution(**kwargs)
263
264 # the start_query_execution API is async, it returns the execution id
265 exec_id = res["QueryExecutionId"]
266
267 # wait for the execution to finish
268 wait_athena_query_to_succeed(bsm=bsm, exec_id=exec_id, delta=delta, timeout=timeout)
269 lazy_df = read_athena_query_result(
270 bsm=bsm,
271 s3dir_result=s3dir_result,
272 exec_id=exec_id,
273 verbose=verbose,
274 )
275 return lazy_df, exec_id
276
277
278if __name__ == "__main__":
279 import random
280 import textwrap
281
282 from boto_session_manager import BotoSesManager
283 from s3pathlib import S3Path
284
285 bsm = BotoSesManager()
286
287 bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-data"
288 prefix = f"athena/results/"
289 s3dir_result = S3Path(f"s3://{bucket}/{prefix}").to_dir()
290
291 database = "dynamodb_to_datalake"
292 table = "transactions"
293
294 n = random.randint(100, 500)
295 sql = textwrap.dedent(
296 f"""
297 SELECT *
298 FROM "{database}"."{table}"
299 LIMIT {n};
300 """
301 )
302
303 lazy_df, exec_id = run_athena_query(
304 bsm=bsm,
305 s3dir_result=s3dir_result,
306 sql=sql,
307 database=database,
308 )
309 df = lazy_df.collect()
310 print(df.shape)
311 print(df)
312 assert df.shape[0] == n
Output:
query result manifest: https://console.aws.amazon.com/s3/object/111122223333-us-east-1-data?prefix=athena/results/metadata/9d59666e-0b1a-47d0-92ec-9c1627191ec4-manifest.csv
query result data: https://console.aws.amazon.com/s3/buckets/111122223333-us-east-1-data?prefix=athena/results/dataset/cfb91cdef7ea4c12877db974127d2966/
number of files in result: 10
(246, 18)
shape: (246, 18)
┌────────────┬────────────┬────────────┬────────────┬─────┬────────────┬──────────┬───────────┬────────────┐
│ _hoodie_co ┆ _hoodie_co ┆ _hoodie_re ┆ _hoodie_pa ┆ ... ┆ create_mon ┆ create_d ┆ create_ho ┆ create_min │
│ mmit_time ┆ mmit_seqno ┆ cord_key ┆ rtition_pa ┆ ┆ th ┆ ay ┆ ur ┆ ute │
│ --- ┆ --- ┆ --- ┆ th ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ --- ┆ ┆ str ┆ str ┆ str ┆ str │
│ ┆ ┆ ┆ str ┆ ┆ ┆ ┆ ┆ │
╞════════════╪════════════╪════════════╪════════════╪═════╪════════════╪══════════╪═══════════╪════════════╡
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 32 │
│ 5254041 ┆ 5254041_5_ ┆ :769-359-8 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 0 ┆ 960,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :333-843-8 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 1 ┆ 563,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :038-353-2 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 2 ┆ 716,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :554-923-0 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 0 ┆ 842,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :526-507-4 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 59 ┆ 992,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :555-852-7 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 60 ┆ 716,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :531-448-3 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 61 ┆ 070,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2023080801 ┆ 2023080801 ┆ id:account ┆ create_yea ┆ ... ┆ 08 ┆ 07 ┆ 22 ┆ 43 │
│ 5254041 ┆ 5254041_9_ ┆ :491-600-3 ┆ r=2023/cre ┆ ┆ ┆ ┆ ┆ │
│ ┆ 62 ┆ 033,create ┆ ate_month= ┆ ┆ ┆ ┆ ┆ │
│ ┆ ┆ _a... ┆ 08... ┆ ┆ ┆ ┆ ┆ │
└────────────┴────────────┴────────────┴────────────┴─────┴────────────┴──────────┴───────────┴────────────┘