AWS Step Functions - Lambda Function Input Output Parameter#
Keywords: AWS Step Function, StepFunction, SFN, State Machine, StateMachine, Lambda
在 AWS StepFunction 的编排中, Lambda Function 通常是最为常用的组成部分. 而同一个 Lambda Function, 我们可能会有多种使用方法. 下面我们以一个简单的对 S3 object 进行处理的 Lambda Function 为例, 它只是返回这个 Object 的 size (in bytes).
这个业务的核心逻辑是:
import boto3
s3_client = boto3.client("s3")
def main(bucket: str, key: str) -> int:
return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
为了方便测试, 我们往往用 Request Response 的方式直接对 Lambda Function 发起请求, 并给与核心逻辑所需要的参数. 这种方式可以确保 LBD 的核心功能运转正常.
另一种常见的运行 Lambda Function 的方式是由 Event 触发.
而如果用 SFN 运行 LBD, 由多种方式传递参数. 最常见的是用前一个 Step 将核心逻辑所需的参数传递给 LBD. 但我们这里要说两种更加高级的方式:
在 SFN Input 的 Payload 里用一个 key, value, 其中 value 是一个 dict, 里面包含了核心的参数. 这种方式只适用于 LBD 的参数在开始 SFN 的时候就已经确定下来了. 如果这个参数需要基于一些中间态的计算结果, 那么这种方式就不适用了.
用 SFN execution id 作为一个唯一的 id, 把这个 execution 相关的 data 都存在一个 S3 object 中. 这个 exec data 本质上是一个 JSON, 所有的中间计算步骤都可以对这个 JSON 进行读写. 然后约定一个规定, 每个 step 的输入数据放在哪个 key 下. 然后所有的 LBD 的输入参数都只包含这个 SFN exec id, 然后去 S3 里拿数据. 这样完全跳过了 SFN 的限制,
下面我给出了上面提到的所有方式的代码实现, 以及提供了一个能兼容所有模式的 Lambda Function 的代码.
lbd1_request_response.py
1# -*- coding: utf-8 -*-
2
3from pprint import pprint
4import boto3
5
6s3_client = boto3.client("s3")
7
8
9def main(bucket: str, key: str) -> int:
10 return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
11
12
13def lambda_handler(event: dict, context):
14 print("----- event -----")
15 pprint(event)
16 return main(event["bucket"], event["key"])
lbd2_event_triggered.py
1# -*- coding: utf-8 -*-
2
3from pprint import pprint
4import boto3
5
6s3_client = boto3.client("s3")
7
8
9def main(bucket: str, key: str) -> int:
10 return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
11
12
13def lambda_handler(event: dict, context):
14 print("----- event -----")
15 pprint(event)
16 bucket = event["Records"][0]["s3"]["bucket"]["name"]
17 key = event["Records"][0]["s3"]["object"]["key"]
18 return main(bucket=bucket, key=key)
lbd3_sfn_input_object.py
1# -*- coding: utf-8 -*-
2
3from pprint import pprint
4import boto3
5
6s3_client = boto3.client("s3")
7
8
9def main(bucket: str, key: str) -> int:
10 return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
11
12
13def lambda_handler(event: dict, context):
14 print("----- event -----")
15 pprint(event)
16 return main(event["bucket"], event["key"])
lbd3_sfn_input_object_sfn_def.json
1{
2 "Comment": "A description of my state machine",
3 "StartAt": "Lambda Invoke",
4 "States": {
5 "Lambda Invoke": {
6 "Type": "Task",
7 "Resource": "arn:aws:states:::lambda:invoke",
8 "OutputPath": "$.Payload",
9 "Parameters": {
10 "FunctionName": "arn:aws:lambda:us-east-1:878625312159:function:sfn-poc-lbd-1:$LATEST",
11 "Payload.$": "$$.Execution.Input.task1"
12 },
13 "Retry": [
14 {
15 "ErrorEquals": [
16 "Lambda.ServiceException",
17 "Lambda.AWSLambdaException",
18 "Lambda.SdkClientException",
19 "Lambda.TooManyRequestsException"
20 ],
21 "IntervalSeconds": 1,
22 "MaxAttempts": 3,
23 "BackoffRate": 2
24 }
25 ],
26 "End": true
27 }
28 }
29}
lbd3_sfn_input_object_test.py
1# -*- coding: utf-8 -*-
2
3import json
4from boto_session_manager import BotoSesManager
5
6bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
7sfn_arn = f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:stateMachine:sfn-poc"
8lbd_arn = f"arn:aws:lambda:{bsm.aws_region}:{bsm.aws_account_id}:function:sfn-poc-lbd-1"
9bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
10key = "projects/tmp/fw2-1.pdf"
11
12input_data = {
13 "task1": {
14 "bucket": bucket,
15 "key": key,
16 }
17}
18bsm.sfn_client.start_execution(
19 stateMachineArn=sfn_arn,
20 input=json.dumps(input_data),
21)
lbd4_sfn_exec_arn.py
1# -*- coding: utf-8 -*-
2
3from pprint import pprint
4import json
5import boto3
6import base64
7
8s3_client = boto3.client("s3")
9
10
11def main(bucket: str, key: str) -> int:
12 return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
13
14
15def lambda_handler(event: dict, context):
16 print("----- event -----")
17 pprint(event)
18 exec_arn = event["exec_arn"]
19 bucket = "bmt-app-dev-us-east-1-data"
20
21 filename = base64.b64encode(exec_arn.encode("utf-8")).decode("utf-8")
22 key = f"projects/tmp/sfn-exec/{filename}.json"
23 res = s3_client.get_object(Bucket=bucket, Key=key)
24 data = json.loads(res["Body"].read().decode("utf-8"))
25 print("----- data -----")
26 pprint(data)
27
28 params = data["task1"]
29 return main(**params)
lbd4_sfn_exec_arn_sfn_def.json
1{
2 "Comment": "A description of my state machine",
3 "StartAt": "Lambda Invoke",
4 "States": {
5 "Lambda Invoke": {
6 "Type": "Task",
7 "Resource": "arn:aws:states:::lambda:invoke",
8 "OutputPath": "$.Payload",
9 "Parameters": {
10 "FunctionName": "arn:aws:lambda:us-east-1:878625312159:function:sfn-poc-lbd-1:$LATEST",
11 "Payload": {
12 "exec_arn.$": "$$.Execution.Id"
13 }
14 },
15 "Retry": [
16 {
17 "ErrorEquals": [
18 "Lambda.ServiceException",
19 "Lambda.AWSLambdaException",
20 "Lambda.SdkClientException",
21 "Lambda.TooManyRequestsException"
22 ],
23 "IntervalSeconds": 1,
24 "MaxAttempts": 3,
25 "BackoffRate": 2
26 }
27 ],
28 "End": true
29 }
30 }
31}
lbd4_sfn_exec_arn_test.py
1# -*- coding: utf-8 -*-
2
3import base64
4import uuid
5import json
6from boto_session_manager import BotoSesManager
7
8bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
9sfn_arn = f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:stateMachine:sfn-poc"
10lbd_arn = f"arn:aws:lambda:{bsm.aws_region}:{bsm.aws_account_id}:function:sfn-poc-lbd-1"
11bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
12key = "projects/tmp/fw2-1.pdf"
13input_data = {
14 "task1": {
15 "bucket": bucket,
16 "key": key,
17 }
18}
19exec_name = str(uuid.uuid4())
20exec_arn = f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:execution:sfn-poc:{exec_name}"
21filename = base64.b64encode(exec_arn.encode("utf-8")).decode("utf-8")
22key_sfn_exec = f"projects/tmp/sfn-exec/{filename}.json"
23bsm.s3_client.put_object(
24 Bucket=bucket,
25 Key=key_sfn_exec,
26 Body=json.dumps(input_data),
27)
28
29bsm.sfn_client.start_execution(
30 stateMachineArn=sfn_arn,
31 input=json.dumps(input_data),
32 name=exec_name,
33)
lbd5_universal.py
1# -*- coding: utf-8 -*-
2
3from pprint import pprint
4import json
5import boto3
6import base64
7
8s3_client = boto3.client("s3")
9
10
11def main(bucket: str, key: str) -> int:
12 return s3_client.get_object(Bucket=bucket, Key=key)["ContentLength"]
13
14
15def lambda_handler(event: dict, context):
16 print("----- event -----")
17 pprint(event)
18
19 # event triggered
20 try:
21 bucket = event["Records"][0]["s3"]["bucket"]["name"]
22 key = event["Records"][0]["s3"]["object"]["key"]
23 print("event invocation")
24 return main(bucket, key)
25 except:
26 pass
27
28 # step function invocation
29 # get input arg from s3
30 if "exec_arn" in event:
31 exec_arn = event["exec_arn"]
32 bucket = "bmt-app-dev-us-east-1-data"
33
34 filename = base64.b64encode(exec_arn.encode("utf-8")).decode("utf-8")
35 key = f"projects/tmp/sfn-exec/{filename}.json"
36 res = s3_client.get_object(Bucket=bucket, Key=key)
37 data = json.loads(res["Body"].read().decode("utf-8"))
38 print("----- data -----")
39 pprint(data)
40 params = data["task1"]
41 return main(**params)
42
43 # request, response styled invocation
44 return main(**event)
lbd5_universal_test.py
1# -*- coding: utf-8 -*-
2
3import base64
4import uuid
5import json
6from boto_session_manager import BotoSesManager
7
8bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
9sfn_arn = f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:stateMachine:sfn-poc"
10lbd_arn = f"arn:aws:lambda:{bsm.aws_region}:{bsm.aws_account_id}:function:sfn-poc-lbd-1"
11bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
12key = "projects/tmp/fw2-1.pdf"
13
14
15def test_lbd1_request_response():
16 res = bsm.lambda_client.invoke(
17 FunctionName=lbd_arn,
18 InvocationType="RequestResponse",
19 Payload=json.dumps({"bucket": bucket, "key": key}),
20 )
21 print(res["Payload"].read().decode("utf-8"))
22
23
24def test_lbd3_sfn_input_object():
25 bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
26 key = "projects/tmp/fw2-1.pdf"
27
28 input_data = {
29 "task1": {
30 "bucket": bucket,
31 "key": key,
32 }
33 }
34 bsm.sfn_client.start_execution(
35 stateMachineArn=sfn_arn,
36 input=json.dumps(input_data),
37 )
38
39
40def test_lbd4_sfn_exec_arn():
41 input_data = {
42 "task1": {
43 "bucket": bucket,
44 "key": key,
45 }
46 }
47 exec_name = str(uuid.uuid4())
48 exec_arn = f"arn:aws:states:{bsm.aws_region}:{bsm.aws_account_id}:execution:sfn-poc:{exec_name}"
49 filename = base64.b64encode(exec_arn.encode("utf-8")).decode("utf-8")
50 key_sfn_exec = f"projects/tmp/sfn-exec/{filename}.json"
51 bsm.s3_client.put_object(
52 Bucket=bucket,
53 Key=key_sfn_exec,
54 Body=json.dumps(input_data),
55 )
56
57 bsm.sfn_client.start_execution(
58 stateMachineArn=sfn_arn,
59 input=json.dumps(input_data),
60 name=exec_name,
61 )
62
63
64test_lbd1_request_response()
65test_lbd3_sfn_input_object()
66test_lbd4_sfn_exec_arn()