Glue Crawler With Different DataLake#
CSV#
csv_example_no_partition.py
1# -*- coding: utf-8 -*-
2
3import io
4import uuid
5import random
6
7import polars as pl
8from faker import Faker
9
10from better_glue import Database, Crawler
11from settings import bsm, aws_console, s3dir_db, database_name, iam_role
12
13table_name = "csv_example_no_partition"
14s3dir_tb = s3dir_db.joinpath(table_name).to_dir()
15crawler_name = f"{database_name}__{table_name}"
16fake = Faker()
17
18database_url = aws_console.glue.get_database(database_or_arn=database_name)
19table_url = aws_console.glue.get_table(table_or_arn=table_name, database=database_name)
20crawler_url = aws_console.glue.get_crawler(name_or_arn=crawler_name)
21print(f"table s3dir = {s3dir_tb.console_url}")
22print(f"{database_url = }")
23print(f"{table_url = }")
24print(f"{crawler_url = }")
25
26
27def prepare_data():
28 print("prepare data ...")
29 s3dir_tb.delete()
30 n_records = 1000
31 df = pl.DataFrame(
32 {
33 "id": range(1, 1 + n_records),
34 "name": [fake.name() for _ in range(n_records)],
35 "create_time": [
36 fake.date_time().replace(year=random.randint(2001, 2010))
37 for _ in range(n_records)
38 ],
39 }
40 ).with_columns(pl.col("create_time").dt.year().alias("year"))
41 for _, sub_df in df.group_by("year"):
42 sub_df = sub_df.drop("year")
43 s3path = s3dir_tb.joinpath(f"{uuid.uuid4()}.csv")
44 buffer = io.BytesIO()
45 sub_df.write_csv(buffer)
46 s3path.write_bytes(buffer.getvalue(), bsm=bsm, content_type="text/csv")
47
48
49def create_database():
50 print("create database ...")
51 db = Database.get(glue_client=bsm.glue_client, name=database_name)
52 if db is None:
53 bsm.glue_client.create_database(DatabaseInput=dict(Name=database_name))
54
55
56def create_crawler():
57 print("create crawler ...")
58 crawler = Crawler.get(
59 glue_client=bsm.glue_client,
60 name=crawler_name,
61 )
62 if crawler is not None:
63 bsm.glue_client.delete_crawler(Name=crawler_name)
64 crawler = bsm.glue_client.create_crawler(
65 Name=crawler_name,
66 Role=iam_role,
67 DatabaseName=database_name,
68 Targets=dict(
69 S3Targets=[
70 dict(
71 Path=s3dir_tb.uri,
72 ),
73 ],
74 ),
75 RecrawlPolicy=dict(
76 RecrawlBehavior="CRAWL_EVERYTHING",
77 ),
78 )
79
80
81def run_crawler():
82 print("run crawler ...")
83 bsm.glue_client.start_crawler(Name=crawler_name)
84
85
86# prepare_data()
87# create_database()
88# create_crawler()
89run_crawler()
csv_example_has_partition.py
1# -*- coding: utf-8 -*-
2
3import io
4import uuid
5import random
6
7import polars as pl
8from faker import Faker
9
10from better_glue import Database, Crawler
11from settings import bsm, aws_console, s3dir_db, database_name, iam_role
12
13table_name = "csv_example_has_partition"
14s3dir_tb = s3dir_db.joinpath(table_name).to_dir()
15crawler_name = f"{database_name}__{table_name}"
16fake = Faker()
17
18database_url = aws_console.glue.get_database(database_or_arn=database_name)
19table_url = aws_console.glue.get_table(table_or_arn=table_name, database=database_name)
20crawler_url = aws_console.glue.get_crawler(name_or_arn=crawler_name)
21print(f"table s3dir = {s3dir_tb.console_url}")
22print(f"{database_url = }")
23print(f"{table_url = }")
24print(f"{crawler_url = }")
25
26
27def prepare_data():
28 print("prepare data ...")
29 s3dir_tb.delete()
30 n_records = 1000
31 df = pl.DataFrame(
32 {
33 "id": range(1, 1 + n_records),
34 "name": [fake.name() for _ in range(n_records)],
35 "create_time": [
36 fake.date_time().replace(year=random.randint(2001, 2010))
37 for _ in range(n_records)
38 ],
39 }
40 ).with_columns(pl.col("create_time").dt.year().alias("year"))
41 for (year,), sub_df in df.group_by("year"):
42 sub_df = sub_df.drop("year")
43 s3path = s3dir_tb.joinpath(f"year={year}", f"{uuid.uuid4()}.csv")
44 buffer = io.BytesIO()
45 sub_df.write_csv(buffer)
46 s3path.write_bytes(buffer.getvalue(), bsm=bsm, content_type="text/csv")
47
48
49def create_database():
50 print("create database ...")
51 db = Database.get(glue_client=bsm.glue_client, name=database_name)
52 if db is None:
53 bsm.glue_client.create_database(DatabaseInput=dict(Name=database_name))
54
55
56def create_crawler():
57 print("create crawler ...")
58 crawler = Crawler.get(
59 glue_client=bsm.glue_client,
60 name=crawler_name,
61 )
62 if crawler is not None:
63 bsm.glue_client.delete_crawler(Name=crawler_name)
64 crawler = bsm.glue_client.create_crawler(
65 Name=crawler_name,
66 Role=iam_role,
67 DatabaseName=database_name,
68 Targets=dict(
69 S3Targets=[
70 dict(
71 Path=s3dir_tb.uri,
72 ),
73 ],
74 ),
75 RecrawlPolicy=dict(
76 RecrawlBehavior="CRAWL_EVERYTHING",
77 ),
78 )
79
80
81def run_crawler():
82 print("run crawler ...")
83 bsm.glue_client.start_crawler(Name=crawler_name)
84
85
86# prepare_data()
87# create_database()
88# create_crawler()
89# run_crawler()
Parquet#
parquet_example_no_partition.py
1# -*- coding: utf-8 -*-
2
3import io
4import uuid
5import random
6
7import polars as pl
8from faker import Faker
9
10from better_glue import Database, Crawler
11from settings import bsm, aws_console, s3dir_db, database_name, iam_role
12
13table_name = "parquet_example_no_partition"
14s3dir_tb = s3dir_db.joinpath(table_name).to_dir()
15crawler_name = f"{database_name}__{table_name}"
16fake = Faker()
17
18database_url = aws_console.glue.get_database(database_or_arn=database_name)
19table_url = aws_console.glue.get_table(table_or_arn=table_name, database=database_name)
20crawler_url = aws_console.glue.get_crawler(name_or_arn=crawler_name)
21print(f"table s3dir = {s3dir_tb.console_url}")
22print(f"{database_url = }")
23print(f"{table_url = }")
24print(f"{crawler_url = }")
25
26
27def prepare_data():
28 print("prepare data ...")
29 s3dir_tb.delete()
30 n_records = 1000
31 df = pl.DataFrame(
32 {
33 "id": range(1, 1 + n_records),
34 "name": [fake.name() for _ in range(n_records)],
35 "create_time": [
36 fake.date_time().replace(year=random.randint(2001, 2010))
37 for _ in range(n_records)
38 ],
39 }
40 ).with_columns(pl.col("create_time").dt.year().alias("year"))
41 for _, sub_df in df.group_by("year"):
42 sub_df = sub_df.drop("year")
43 s3path = s3dir_tb.joinpath(f"{uuid.uuid4()}.snappy.parquet")
44 buffer = io.BytesIO()
45 sub_df.write_parquet(buffer, compression="snappy")
46 s3path.write_bytes(
47 buffer.getvalue(), bsm=bsm, content_type="application/x-parquet"
48 )
49
50
51def create_database():
52 print("create database ...")
53 db = Database.get(glue_client=bsm.glue_client, name=database_name)
54 if db is None:
55 bsm.glue_client.create_database(DatabaseInput=dict(Name=database_name))
56
57
58def create_crawler():
59 print("create crawler ...")
60 crawler = Crawler.get(
61 glue_client=bsm.glue_client,
62 name=crawler_name,
63 )
64 if crawler is not None:
65 bsm.glue_client.delete_crawler(Name=crawler_name)
66 crawler = bsm.glue_client.create_crawler(
67 Name=crawler_name,
68 Role=iam_role,
69 DatabaseName=database_name,
70 Targets=dict(
71 S3Targets=[
72 dict(
73 Path=s3dir_tb.uri,
74 ),
75 ],
76 ),
77 RecrawlPolicy=dict(
78 RecrawlBehavior="CRAWL_EVERYTHING",
79 ),
80 )
81
82
83def run_crawler():
84 print("run crawler ...")
85 bsm.glue_client.start_crawler(Name=crawler_name)
86
87
88# prepare_data()
89# create_database()
90# create_crawler()
91# run_crawler()
parquet_example_has_partition.py
1# -*- coding: utf-8 -*-
2
3import io
4import uuid
5import random
6
7import polars as pl
8from faker import Faker
9
10from better_glue import Database, Crawler
11from settings import bsm, aws_console, s3dir_db, database_name, iam_role
12
13table_name = "parquet_example_has_partition"
14s3dir_tb = s3dir_db.joinpath(table_name).to_dir()
15crawler_name = f"{database_name}__{table_name}"
16fake = Faker()
17
18database_url = aws_console.glue.get_database(database_or_arn=database_name)
19table_url = aws_console.glue.get_table(table_or_arn=table_name, database=database_name)
20crawler_url = aws_console.glue.get_crawler(name_or_arn=crawler_name)
21print(f"table s3dir = {s3dir_tb.console_url}")
22print(f"{database_url = }")
23print(f"{table_url = }")
24print(f"{crawler_url = }")
25
26
27def prepare_data():
28 print("prepare data ...")
29 s3dir_tb.delete()
30 n_records = 1000
31 df = pl.DataFrame(
32 {
33 "id": range(1, 1 + n_records),
34 "name": [fake.name() for _ in range(n_records)],
35 "create_time": [
36 fake.date_time().replace(year=random.randint(2001, 2010))
37 for _ in range(n_records)
38 ],
39 }
40 ).with_columns(pl.col("create_time").dt.year().alias("year"))
41 for (year,), sub_df in df.group_by("year"):
42 sub_df = sub_df.drop("year")
43 s3path = s3dir_tb.joinpath(f"year={year}", f"{uuid.uuid4()}.snappy_parquet")
44 buffer = io.BytesIO()
45 sub_df.write_parquet(buffer, compression="snappy")
46 s3path.write_bytes(
47 buffer.getvalue(), bsm=bsm, content_type="application/x-parquet"
48 )
49
50
51def create_database():
52 print("create database ...")
53 db = Database.get(glue_client=bsm.glue_client, name=database_name)
54 if db is None:
55 bsm.glue_client.create_database(DatabaseInput=dict(Name=database_name))
56
57
58def create_crawler():
59 print("create crawler ...")
60 crawler = Crawler.get(
61 glue_client=bsm.glue_client,
62 name=crawler_name,
63 )
64 if crawler is not None:
65 bsm.glue_client.delete_crawler(Name=crawler_name)
66 crawler = bsm.glue_client.create_crawler(
67 Name=crawler_name,
68 Role=iam_role,
69 DatabaseName=database_name,
70 Targets=dict(
71 S3Targets=[
72 dict(
73 Path=s3dir_tb.uri,
74 ),
75 ],
76 ),
77 RecrawlPolicy=dict(
78 RecrawlBehavior="CRAWL_EVERYTHING",
79 ),
80 )
81
82
83def run_crawler():
84 print("run crawler ...")
85 bsm.glue_client.start_crawler(Name=crawler_name)
86
87
88# prepare_data()
89# create_database()
90# create_crawler()
91# run_crawler()
Delta Lake#
DeltaLake 社区有一个非常好用的 Python 库 delta-rs, 它是 DeltaLake 的 Rust 原生实现的 Python binding. 它是基于 DeltaLake 3.X 版本的. 我非常喜欢用这个库来将数据写入到 DeltaLake 中.
但我在尝试用 Glue Crawler 来自动从 DeltaLake 的 S3 Location 生成 Glue Table 时遇到了一个问题, 我尝试了所有参数的排列组合, 但 Glue Crawler 依然无法成功生成 Glue Table. 后来经过一天的 Debug, 我找到了这篇 官方 RePost, 里面明确提到了 Glue Crawler 的 DeltaLake 是基于 1.0 的, 它无法识别我用 3.X 版本写入的数据. 所以目前 Glue Crawler 是无法为我的 S3 folder 自动生成 Glue Table 的.
结论就是, 我只能为我的 DeltaLake 手动创建 Glue Table 了. 但是 Glue Table 的参数众多, 我不知道如何设置. 于是我想到了一个办法. 我根据 Introducing native Delta Lake table support with AWS Glue crawlers 这篇博文, 成功用 Crawler 生成了一个 DeltaLake 1.0 的 Glue Table. 这里除了 Schema 的部分, 其实都是 DeltaLake Table 的标准配置. 那么我就可以用纯 Parquet 的 DataLake 让 Crawler 创建一个 Table, 然后查看里面 Partition 的设置, 然后对这个 1.0 的 Glue Table 进行一些 Schema 和 Partition 的修改, 就能得到一个 DeltaLake 3.X 的 Glue Table 了.
这种方式我测试过了, 在有 Upsert 的情况下依然可以查询到最新的数据.
succeeded_cralwer_generated_delta_lake_table_without_partition.py
1import datetime
2
3dct = {
4 "Table": {
5 "Name": "sample_delta_table",
6 "DatabaseName": "glue_crawler_test",
7 "Description": "",
8 "CreateTime": datetime.datetime(2024, 8, 30, 9, 46, 25, tzinfo=tzlocal()),
9 "UpdateTime": datetime.datetime(2024, 8, 30, 9, 51, 19, tzinfo=tzlocal()),
10 "Retention": 0,
11 "StorageDescriptor": {
12 "Columns": [
13 {"Name": "id", "Type": "bigint", "Comment": ""},
14 {"Name": "name", "Type": "string", "Comment": ""},
15 {"Name": "create_time", "Type": "timestamp", "Comment": ""},
16 ],
17 "Location": "s3://bmt-app-dev-us-east-1-data/projects/parquet_dynamodb/example/staging/sample_delta_table/",
18 "AdditionalLocations": [],
19 "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
20 "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
21 "Compressed": False,
22 "NumberOfBuckets": -1,
23 "SerdeInfo": {
24 "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
25 "Parameters": {
26 "serialization.format": "1",
27 "path": "s3://bmt-app-dev-us-east-1-data/projects/learn_aws/glue_crawler/databases/glue_crawler_test/delta_example_has_partition/",
28 },
29 },
30 "BucketColumns": [],
31 "SortColumns": [],
32 "Parameters": {
33 "UPDATED_BY_CRAWLER": "sample_delta_table",
34 "EXTERNAL": "true",
35 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
36 "CrawlerSchemaSerializerVersion": "1.0",
37 "CrawlerSchemaDeserializerVersion": "1.0",
38 "spark.sql.partitionProvider": "catalog",
39 "classification": "delta",
40 "spark.sql.sources.schema.numParts": "1",
41 "spark.sql.sources.provider": "delta",
42 "delta.lastUpdateVersion": "6",
43 "delta.lastCommitTimestamp": "1653462383292",
44 "table_type": "delta",
45 },
46 "StoredAsSubDirectories": False,
47 },
48 "PartitionKeys": [],
49 "TableType": "EXTERNAL_TABLE",
50 "Parameters": {
51 "UPDATED_BY_CRAWLER": "sample_delta_table",
52 "EXTERNAL": "true",
53 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
54 "CrawlerSchemaSerializerVersion": "1.0",
55 "CrawlerSchemaDeserializerVersion": "1.0",
56 "spark.sql.partitionProvider": "catalog",
57 "classification": "delta",
58 "spark.sql.sources.schema.numParts": "1",
59 "spark.sql.sources.provider": "delta",
60 "delta.lastUpdateVersion": "6",
61 "delta.lastCommitTimestamp": "1653462383292",
62 "table_type": "delta",
63 },
64 "CreatedBy": "arn:aws:sts::878625312159:assumed-role/all-services-admin-role/AWS-Crawler",
65 "IsRegisteredWithLakeFormation": False,
66 "CatalogId": "878625312159",
67 "VersionId": "2",
68 },
69}
succeeded_cralwer_generated_parquet_table_partition_details.py
1import datetime
2
3dct = {
4 "Table": {
5 "Name": "sample_delta_table",
6 "DatabaseName": "glue_crawler_test",
7 "Description": "",
8 "CreateTime": datetime.datetime(2024, 8, 30, 9, 46, 25, tzinfo=tzlocal()),
9 "UpdateTime": datetime.datetime(2024, 8, 30, 9, 51, 19, tzinfo=tzlocal()),
10 "Retention": 0,
11 "StorageDescriptor": {
12 "Columns": [
13 {"Name": "id", "Type": "bigint", "Comment": ""},
14 {"Name": "name", "Type": "string", "Comment": ""},
15 {"Name": "create_time", "Type": "timestamp", "Comment": ""},
16 ],
17 "Location": "s3://bmt-app-dev-us-east-1-data/projects/parquet_dynamodb/example/staging/sample_delta_table/",
18 "AdditionalLocations": [],
19 "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
20 "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
21 "Compressed": False,
22 "NumberOfBuckets": -1,
23 "SerdeInfo": {
24 "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
25 "Parameters": {
26 "serialization.format": "1",
27 "path": "s3://bmt-app-dev-us-east-1-data/projects/learn_aws/glue_crawler/databases/glue_crawler_test/delta_example_has_partition/",
28 },
29 },
30 "BucketColumns": [],
31 "SortColumns": [],
32 "Parameters": {
33 "UPDATED_BY_CRAWLER": "sample_delta_table",
34 "EXTERNAL": "true",
35 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
36 "CrawlerSchemaSerializerVersion": "1.0",
37 "CrawlerSchemaDeserializerVersion": "1.0",
38 "spark.sql.partitionProvider": "catalog",
39 "classification": "delta",
40 "spark.sql.sources.schema.numParts": "1",
41 "spark.sql.sources.provider": "delta",
42 "delta.lastUpdateVersion": "6",
43 "delta.lastCommitTimestamp": "1653462383292",
44 "table_type": "delta",
45 },
46 "StoredAsSubDirectories": False,
47 },
48 "PartitionKeys": [],
49 "TableType": "EXTERNAL_TABLE",
50 "Parameters": {
51 "UPDATED_BY_CRAWLER": "sample_delta_table",
52 "EXTERNAL": "true",
53 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
54 "CrawlerSchemaSerializerVersion": "1.0",
55 "CrawlerSchemaDeserializerVersion": "1.0",
56 "spark.sql.partitionProvider": "catalog",
57 "classification": "delta",
58 "spark.sql.sources.schema.numParts": "1",
59 "spark.sql.sources.provider": "delta",
60 "delta.lastUpdateVersion": "6",
61 "delta.lastCommitTimestamp": "1653462383292",
62 "table_type": "delta",
63 },
64 "CreatedBy": "arn:aws:sts::878625312159:assumed-role/all-services-admin-role/AWS-Crawler",
65 "IsRegisteredWithLakeFormation": False,
66 "CatalogId": "878625312159",
67 "VersionId": "2",
68 },
69}
succeeded_cralwer_generated_parquet_table_with_partition.py
1import datetime
2
3dct = {
4 "Table": {
5 "Name": "sample_delta_table",
6 "DatabaseName": "glue_crawler_test",
7 "Description": "",
8 "CreateTime": datetime.datetime(2024, 8, 30, 9, 46, 25, tzinfo=tzlocal()),
9 "UpdateTime": datetime.datetime(2024, 8, 30, 9, 51, 19, tzinfo=tzlocal()),
10 "Retention": 0,
11 "StorageDescriptor": {
12 "Columns": [
13 {"Name": "id", "Type": "bigint", "Comment": ""},
14 {"Name": "name", "Type": "string", "Comment": ""},
15 {"Name": "create_time", "Type": "timestamp", "Comment": ""},
16 ],
17 "Location": "s3://bmt-app-dev-us-east-1-data/projects/parquet_dynamodb/example/staging/sample_delta_table/",
18 "AdditionalLocations": [],
19 "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
20 "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
21 "Compressed": False,
22 "NumberOfBuckets": -1,
23 "SerdeInfo": {
24 "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
25 "Parameters": {
26 "serialization.format": "1",
27 "path": "s3://bmt-app-dev-us-east-1-data/projects/learn_aws/glue_crawler/databases/glue_crawler_test/delta_example_has_partition/",
28 },
29 },
30 "BucketColumns": [],
31 "SortColumns": [],
32 "Parameters": {
33 "UPDATED_BY_CRAWLER": "sample_delta_table",
34 "EXTERNAL": "true",
35 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
36 "CrawlerSchemaSerializerVersion": "1.0",
37 "CrawlerSchemaDeserializerVersion": "1.0",
38 "spark.sql.partitionProvider": "catalog",
39 "classification": "delta",
40 "spark.sql.sources.schema.numParts": "1",
41 "spark.sql.sources.provider": "delta",
42 "delta.lastUpdateVersion": "6",
43 "delta.lastCommitTimestamp": "1653462383292",
44 "table_type": "delta",
45 },
46 "StoredAsSubDirectories": False,
47 },
48 "PartitionKeys": [],
49 "TableType": "EXTERNAL_TABLE",
50 "Parameters": {
51 "UPDATED_BY_CRAWLER": "sample_delta_table",
52 "EXTERNAL": "true",
53 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"product_id","type":"string","nullable":true,"metadata":{}},{"name":"product_name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"long","nullable":true,"metadata":{}},{"name":"CURRENCY","type":"string","nullable":true,"metadata":{}},{"name":"category","type":"string","nullable":true,"metadata":{}},{"name":"updated_at","type":"double","nullable":true,"metadata":{}}]}',
54 "CrawlerSchemaSerializerVersion": "1.0",
55 "CrawlerSchemaDeserializerVersion": "1.0",
56 "spark.sql.partitionProvider": "catalog",
57 "classification": "delta",
58 "spark.sql.sources.schema.numParts": "1",
59 "spark.sql.sources.provider": "delta",
60 "delta.lastUpdateVersion": "6",
61 "delta.lastCommitTimestamp": "1653462383292",
62 "table_type": "delta",
63 },
64 "CreatedBy": "arn:aws:sts::878625312159:assumed-role/all-services-admin-role/AWS-Crawler",
65 "IsRegisteredWithLakeFormation": False,
66 "CatalogId": "878625312159",
67 "VersionId": "2",
68 },
69}
下面列出了我的最终解决方案.
delta_example_has_partition_manual_create_table.py
1# -*- coding: utf-8 -*-
2
3"""
4这是我研究出来的如何用 `delta-rs <https://github.com/delta-io/delta-rs>`_ Python
5进行写入, 但是用 Glue Table 作为元数据, 使得之后可以直接用 Athena 查询的方法.
6"""
7
8import polars as pl
9from faker import Faker
10
11from better_glue import Database, Table
12from settings import bsm, aws_console, s3dir_db, database_name
13
14table_name = "delta_example_hpmct"
15s3dir_tb = s3dir_db.joinpath(table_name).to_dir()
16crawler_name = f"{database_name}__{table_name}"
17fake = Faker()
18
19database_url = aws_console.glue.get_database(database_or_arn=database_name)
20table_url = aws_console.glue.get_table(table_or_arn=table_name, database=database_name)
21crawler_url = aws_console.glue.get_crawler(name_or_arn=crawler_name)
22print(f"table s3dir = {s3dir_tb.console_url}")
23print(f"{database_url = }")
24print(f"{table_url = }")
25print(f"{crawler_url = }")
26
27credential = bsm.boto_ses.get_credentials()
28storage_options = {
29 "AWS_REGION": bsm.aws_region,
30 "AWS_ACCESS_KEY_ID": credential.access_key,
31 "AWS_SECRET_ACCESS_KEY": credential.secret_key,
32 "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
33}
34
35
36def prepare_data():
37 print("prepare data ...")
38 s3dir_tb.delete()
39
40 df = pl.DataFrame(
41 [
42 {"id": 1, "name": "Alice", "year": "2001"},
43 {"id": 2, "name": "Bob", "year": "2001"},
44 ]
45 )
46 df.write_delta(
47 s3dir_tb.uri,
48 mode="append",
49 delta_write_options=dict(
50 partition_by=["year"],
51 ),
52 storage_options=storage_options,
53 )
54
55 df = pl.DataFrame(
56 [
57 {"id": 2, "name": "Bobby", "year": "2001"},
58 {"id": 3, "name": "Cathy", "year": "2001"},
59 ]
60 )
61 table_merger = df.write_delta(
62 s3dir_tb.uri,
63 mode="merge",
64 delta_write_options=dict(
65 partition_by=["year"],
66 ),
67 delta_merge_options=dict(
68 predicate="s.id = t.id",
69 source_alias="s",
70 target_alias="t",
71 ),
72 storage_options=storage_options,
73 )
74 (
75 table_merger.when_matched_update_all() # will do update
76 .when_not_matched_insert_all() # will do insert
77 .execute()
78 )
79
80
81def create_database():
82 print("create database ...")
83 db = Database.get(glue_client=bsm.glue_client, name=database_name)
84 if db is None:
85 bsm.glue_client.create_database(DatabaseInput=dict(Name=database_name))
86
87
88def create_table():
89 tb = Table.get(glue_client=bsm.glue_client, database=database_name, name=table_name)
90 if tb is not None:
91 bsm.glue_client.delete_table(DatabaseName=database_name, Name=table_name)
92
93 bsm.glue_client.create_table(
94 DatabaseName=database_name,
95 TableInput=dict(
96 Name=table_name,
97 StorageDescriptor=dict(
98 Columns=[
99 {"Name": "id", "Type": "bigint", "Comment": ""},
100 {"Name": "name", "Type": "string", "Comment": ""},
101 ],
102 Location=s3dir_tb.uri,
103 InputFormat="org.apache.hadoop.mapred.SequenceFileInputFormat",
104 OutputFormat="org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
105 Compressed=False,
106 NumberOfBuckets=-1,
107 SerdeInfo={
108 "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
109 "Parameters": {
110 "serialization.format": "1",
111 "path": s3dir_tb.uri,
112 },
113 },
114 BucketColumns=[],
115 SortColumns=[],
116 ),
117 Parameters={
118 "EXTERNAL": "true",
119 "spark.sql.sources.schema.part.0": '{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}',
120 "CrawlerSchemaSerializerVersion": "1.0",
121 "CrawlerSchemaDeserializerVersion": "1.0",
122 "spark.sql.partitionProvider": "catalog",
123 "classification": "delta",
124 "spark.sql.sources.schema.numParts": "1",
125 "spark.sql.sources.provider": "delta",
126 "delta.lastUpdateVersion": "6",
127 "delta.lastCommitTimestamp": "1653462383292",
128 "table_type": "delta",
129 },
130 PartitionKeys=[
131 {"Name": "year", "Type": "string"},
132 ],
133 TableType="EXTERNAL_TABLE",
134 ),
135 )
136
137
138# prepare_data()
139# create_database()
140# create_table()
141# run_crawler()