Work with Lambda and Glue#
Keywords: AWS Step Function, StepFunction, SFN, State Machine, StateMachine, Lambda, Glue
StepFunction 本身作为一款 Serverless 的编排工具, 用来管理同样是 Serverless 的 Lambda Function 和 Glue Job 是再合适不过的了. 本文将用一种比较简单的应用场景为例, 介绍如何使用 StepFunction 来编排 Lambda 和 Glue Job. 在这个简单的例子中, 所有的数据传递都是静态的, 不存在说进行中间态的数据处理的情况. 显然这个例子不适用于所有的情况, 但是静态的场景比动态场景要简单, 掌握了静态场景的情况有助于你学习如何处理动态场景.
理解我们的需求#
在我们的工作流中, 我们有多个 Lambda 和 Glue Job. 它们之间互相都是独立的, 并不需要互相之间传递数据. 但是可能会有先后顺序的依赖关系. 换言之, 我们只有先运行一个, 再运行一个的需求, 而没有说后面的那个需要前面的结果作为参数的情况.
Task 1 - Glue Job#
我们假设 task1 是一个 Glue Job, 它的 argument 是一个 key value pairs 的字典, 例如:
{
"parameter1": "value1",
"parameter2": "value2"
}
在写 Glue Job Script 的时候, 最重要的的是下面这段代码, 它能从 spark-submit
命令中读你的参数. 注意你的参数名最好都用下划线, 虽然 hyphen 也可以, 但是不推荐.
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"parameter1",
"parameter2",
],
)
完整的 Glue Job 代码如下, 它的逻辑是仅仅打印输入参数:
1import sys
2from awsglue.transforms import *
3from awsglue.utils import getResolvedOptions
4from pyspark.context import SparkContext
5from awsglue.context import GlueContext
6from awsglue.job import Job
7
8sc = SparkContext.getOrCreate()
9glueContext = GlueContext(sc)
10spark = glueContext.spark_session
11job = Job(glueContext)
12args = getResolvedOptions(
13 sys.argv,
14 [
15 "JOB_NAME",
16 "parameter1",
17 "parameter2",
18 ],
19)
20print("------ Arguments ------")
21print(args)
22job.commit()
Task 2 - Lambda Function#
我们假设 task2 是一个 Lambda Function, 它的 argument 是任意的 key value pairs 的字典, 例如:
{"name": "alice"}
完整的 Lambda Function 代码如下, 它的逻辑是仅仅打印输入参数:
1import json
2
3def lambda_handler(event, context):
4 print("event:")
5 print(json.dumps(event))
6 return event
Step Function 的输入参数#
我们假设我们的 Workflow 是先 async 运行 Glue Job, 然后直接运行 Lambda Function. 那么 StepFunction 的输入参数应该是:
{
"task1": {
"--parameter1": "value1",
"--parameter2": "value2"
},
"task2": {"name": "task2"}
}
注意, Glue Job 的底层是 spark-submit
命令, 所有的入参都是命令行参数. 所以你要在你的参数前面加 --
.
Step Function Definition#
Step Function 的源码中, 最重要的部分是如何把上一节中的输入参数传递给 Glue Job 和 Lambda Function.
其中给 Glue Job 传递参数时最关键的一步是:
"Task1": {
...
"Parameters": {
"JobName": "test",
"Arguments.$": "$$.Execution.Input.task1"
},
...
},
其中给 Lambda Function 传递参数时最关键的一步是:
"Task2": {
...
"Parameters": {
...
"Payload.$": "$$.Execution.Input.task2"
},
...
}
最终的 Step Function Definition 源码如下: