AWS Step Functions - Work with Lambda and Glue#
Keywords: AWS Step Function, StepFunction, SFN, State Machine, StateMachine, Lambda, Glue
StepFunction 本身作为一款 Serverless 的编排工具, 用来管理同样是 Serverless 的 Lambda Function 和 Glue Job 是再合适不过的了. 本文将用一种比较简单的应用场景为例, 介绍如何使用 StepFunction 来编排 Lambda 和 Glue Job. 在这个简单的例子中, 所有的数据传递都是静态的, 不存在说进行中间态的数据处理的情况. 显然这个例子不适用于所有的情况, 但是静态的场景比动态场景要简单, 掌握了静态场景的情况有助于你学习如何处理动态场景.
理解我们的需求#
在我们的工作流中, 我们有多个 Lambda 和 Glue Job. 它们之间互相都是独立的, 并不需要互相之间传递数据. 但是可能会有先后顺序的依赖关系. 换言之, 我们只有先运行一个, 再运行一个的需求, 而没有说后面的那个需要前面的结果作为参数的情况.
Task 1 - Glue Job#
我们假设 task1 是一个 Glue Job, 它的 argument 是一个 key value pairs 的字典, 例如:
{
"parameter1": "value1",
"parameter2": "value2"
}
在写 Glue Job Script 的时候, 最重要的的是下面这段代码, 它能从 spark-submit 命令中读你的参数. 注意你的参数名最好都用下划线, 虽然 hyphen 也可以, 但是不推荐.
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"parameter1",
"parameter2",
],
)
完整的 Glue Job 代码如下, 它的逻辑是仅仅打印输入参数:
1import sys
2from awsglue.transforms import *
3from awsglue.utils import getResolvedOptions
4from pyspark.context import SparkContext
5from awsglue.context import GlueContext
6from awsglue.job import Job
7
8sc = SparkContext.getOrCreate()
9glueContext = GlueContext(sc)
10spark = glueContext.spark_session
11job = Job(glueContext)
12args = getResolvedOptions(
13 sys.argv,
14 [
15 "JOB_NAME",
16 "parameter1",
17 "parameter2",
18 ],
19)
20print("------ Arguments ------")
21print(args)
22job.commit()
Task 2 - Lambda Function#
我们假设 task2 是一个 Lambda Function, 它的 argument 是任意的 key value pairs 的字典, 例如:
{"name": "alice"}
完整的 Lambda Function 代码如下, 它的逻辑是仅仅打印输入参数:
1import json
2
3def lambda_handler(event, context):
4 print("event:")
5 print(json.dumps(event))
6 return event
Step Function 的输入参数#
我们假设我们的 Workflow 是先 async 运行 Glue Job, 然后直接运行 Lambda Function. 那么 StepFunction 的输入参数应该是:
{
"task1": {
"--parameter1": "value1",
"--parameter2": "value2"
},
"task2": {"name": "task2"}
}
注意, Glue Job 的底层是 spark-submit 命令, 所有的入参都是命令行参数. 所以你要在你的参数前面加 --.
Step Function Definition#
Step Function 的源码中, 最重要的部分是如何把上一节中的输入参数传递给 Glue Job 和 Lambda Function.
其中给 Glue Job 传递参数时最关键的一步是:
"Task1": {
...
"Parameters": {
"JobName": "test",
"Arguments.$": "$$.Execution.Input.task1"
},
...
},
其中给 Lambda Function 传递参数时最关键的一步是:
"Task2": {
...
"Parameters": {
...
"Payload.$": "$$.Execution.Input.task2"
},
...
}
最终的 Step Function Definition 源码如下:
1{
2 "Comment": "A description of my state machine",
3 "StartAt": "Task1",
4 "States": {
5 "Task1": {
6 "Type": "Task",
7 "Resource": "arn:aws:states:::glue:startJobRun",
8 "Parameters": {
9 "JobName": "test",
10 "Arguments.$": "$$.Execution.Input.task1"
11 },
12 "Next": "Task2"
13 },
14 "Task2": {
15 "Type": "Task",
16 "Resource": "arn:aws:states:::lambda:invoke",
17 "OutputPath": "$.Payload",
18 "Parameters": {
19 "FunctionName": "arn:aws:lambda:us-east-1:111122223333:function:test:$LATEST",
20 "Payload.$": "$$.Execution.Input.task2"
21 },
22 "Retry": [
23 {
24 "ErrorEquals": [
25 "Lambda.ServiceException",
26 "Lambda.AWSLambdaException",
27 "Lambda.SdkClientException",
28 "Lambda.TooManyRequestsException"
29 ],
30 "IntervalSeconds": 2,
31 "MaxAttempts": 6,
32 "BackoffRate": 2
33 }
34 ],
35 "End": true
36 }
37 }
38}