AWS CDK - StepFunctions Task#

Keywords: Amazon, AWS, CDK, Step Function, Task

Overview#

Airflow 的那种用来定义 Orchestration Workload 逻辑的链式 API 非常受欢迎. AWS StepFunction 一直以来都想在易用性上追赶 Airflow. AWS 内部有一个 WDK (Workflow Development Kit) 项目, 直到 2023 年下旬, 它的底层组件, 才在 AWS CDK 中以 AWS StepFunctions Task 的形式发布. 本质上这个 Task 就是一个类, 类似于 Airflow 中的 Task 定义. 然后你可以用 task1.next(task2).next(task3) 这样的链式语法把 Task 代码中的定义连接起来.

Examples#

我创建了几个用于快速上手这种新型 API 风格的例子. 请参考下面的源代码. 每一个类都是一个 Stack, 每个 Stack 里只有一个 State Machine.

learn_stepfunctions_task/stacks.py
  1# -*- coding: utf-8 -*-
  2
  3import aws_cdk as cdk
  4import aws_cdk.aws_lambda as aws_lambda
  5import aws_cdk.aws_stepfunctions as sfn
  6import aws_cdk.aws_stepfunctions_tasks as sfn_tasks
  7
  8from constructs import Construct
  9
 10
 11class MySfnTaskStack1(cdk.Stack):
 12    def __init__(
 13        self,
 14        scope: Construct,
 15        construct_id: str,
 16        **kwargs,
 17    ) -> None:
 18        """
 19        Basic stepfunctions_tasks example.
 20        """
 21        super().__init__(scope, construct_id, **kwargs)
 22
 23        self.state_machine = sfn.StateMachine(
 24            self,
 25            "StateMachine",
 26            state_machine_name=f"sfn-task-1",
 27            definition=sfn.Chain.start(sfn.Pass(self, "PassState")),
 28        )
 29
 30
 31class MySfnTaskStack2(cdk.Stack):
 32    def __init__(
 33        self,
 34        scope: Construct,
 35        construct_id: str,
 36        **kwargs,
 37    ) -> None:
 38        """
 39        Invoke Lambda Function.
 40        """
 41        super().__init__(scope, construct_id, **kwargs)
 42
 43        task1 = sfn.Pass(self, "Task 1")
 44        task2 = sfn_tasks.LambdaInvoke(
 45            self,
 46            "Task 2",
 47            lambda_function=aws_lambda.Function.from_function_name(
 48                self,
 49                "SfnTaskTest1",
 50                function_name="sfn_tasks_test_1",
 51            ),
 52        )
 53        definition = task1.next(task2)
 54
 55        self.state_machine = sfn.StateMachine(
 56            self,
 57            "StateMachine",
 58            state_machine_name=f"sfn-task-2",
 59            definition_body=sfn.DefinitionBody.from_chainable(definition),
 60        )
 61
 62
 63class MySfnTaskStack3(cdk.Stack):
 64    def __init__(
 65        self,
 66        scope: Construct,
 67        construct_id: str,
 68        **kwargs,
 69    ) -> None:
 70        """
 71        Use chain syntax to write state machine workflow
 72
 73        Task 1 -> Task 2 ->
 74            if: job succeeded -> job succeeded task
 75            elif: job failed -> job failed task
 76            else: wait 1 sec
 77        """
 78        super().__init__(scope, construct_id, **kwargs)
 79
 80        # task 1
 81        task1 = sfn.Pass(self, "Task 1")
 82
 83        # task 2
 84        # status = "succeeded"
 85        status = "failed"
 86        # status = "running"
 87        task2 = sfn.Pass(self, "Task 2", result=sfn.Result.from_object({"status": status}))
 88
 89        # choice
 90        job_complete_choice = sfn.Choice(self, "Job Complete?")
 91
 92        succeeded_condition = sfn.Condition.string_equals("$.status", "succeeded")
 93        failed_condition = sfn.Condition.string_equals("$.status", "failed")
 94
 95        job_succeeded_task = sfn.Pass(self, "Job Succeeded Task")
 96        job_failed_task = sfn.Pass(self, "Job Failed Task")
 97
 98        # fmt: off
 99        definition = (
100            task1
101            .next(task2)
102            .next(
103                job_complete_choice
104                .when(succeeded_condition, job_succeeded_task.next(sfn.Succeed(self, "Job Succeeded")))
105                .when(failed_condition, job_failed_task.next(sfn.Fail(self, "Job Failed")))
106                .otherwise(sfn.Fail(self, "Unknown Status"))
107            )
108        )
109        # fmt: on
110
111        self.state_machine = sfn.StateMachine(
112            self,
113            "StateMachine",
114            state_machine_name=f"sfn-task-3",
115            definition_body=sfn.DefinitionBody.from_chainable(definition),
116        )

Reference#