Run Remote Command on Ec2 Ultimate Solution#

Keywords: AWS, EC2, System, Systems, Manager, SSM, Python, Remote, Command

我们想要做什么#

出于多种原因, 往往是网络相关的原因, 很多时候代码必须要在 EC2 环境内执行. 而作为开发者, 如何像在本地电脑上执行 Python 自动化脚本一样在 EC2 环境内执行命令呢? 如果能做到这一点, 想象空间可以是无限的. 下面我们详细的拆解一下需求:

从具体执行的命令复杂度来看, 可以分为两类:

  1. 单条在 Terminal 内的命令. 例如 aws s3 ls.

  2. 以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.

从对反馈的要求来看, 可以分为三类:

  1. 我只需要执行, 不需要反馈.

  2. 我需要知道执行的返回码是成功 (0) 还是失败 (非 0).

  3. 我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据.

从命令的发起者来看,

  1. 只需要我的开发电脑能发起命令即可.

  2. 这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.

可以看出, 以上需求可以排列组合, 从而出现 2 * 3 * 2 = 12 种情况. 有没有一种解决方案能够同时满足这 12 种情况呢? 答案是肯定的, 我们将在下面的章节中详细的介绍.

探索可能的解决方案#

我们对上面的需求来一条一条的分析, 看看这些需求后面的本质.

  • 单条在 Terminal 内的命令. 例如 aws s3 ls.

    这个没什么说的, 就是一条远程命令.

  • 以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.

    这就意味着我们总得有一个简单, 可重复, 安全的方法将任意脚本上传到 EC2 环境内.

  • 我只需要执行, 不需要反馈

    这个没什么说的, 简单执行即可.

  • 我需要知道执行的返回码是成功 (0) 还是失败 (非 0)

    这就需要我们能捕获错误码 (return code)

  • 我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据

    要么这个命令本身的设计就是会把返回数据写到 stdout, 那么我们只要能捕获 stdout 即可. 要么在运行时将数据上传到一个中间媒介, 例如 S3, 然后我们再从 S3 读取数据.

  • 只需要我的开发电脑能发起命令即可

    要么我的电脑能 SSH 到 EC2 上去. 要么我的电脑有一些相关的 AWS 权限. 这里的权限主要指的是 AWS System Manager Run Command 的权限. 这是一个 AWS 托管的服务器, 可以利用 SSM Agent 在 EC2 上执行任何命令.

  • 这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.

    这个发起方只要有上面说的 AWS System Manager Run Command 权限即可. 当然开发电脑也可以有这个权限.

好了, 我们现在对解决每一条需求都有个大概的概念了, 下一步我们来将这些方案组合成一个完整的解决方案. 但在这之前, 我们先来了解一下这里的核心技术 AWS SSM Run Command.

AWS SSM Run Command#

AWS System Manager 是一个历史悠久的 AWS 服务, 主要用于批量管理 EC2 instance 虚拟机. 你可以将其理解为 AWS 版本的 Ansible. 而它的核心组件就是 System Manager Agent (SSM Agent), 本质上是一个服务端软件, 安装在 EC2 机器上, 属于系统服务的一部分. 而 AWS 内部对 EC2 的管理工作很多都是通过 SSM Agent 来进行的. 而”Run Command” 则是 SSM 的一项功能, 可以通过 SSM Agent 执行远程命令.

简单来说我们选择 SSM Run Command 作为我们解决方案的核心技术是出于以下几点考量:

  • SSM Run Command 是受 IAM Role 权限保护的, 非常安全且灵活, 兼容于各种 AWS 服务, 使得我们可以在任何 AWS 服务内发起 SSM Run Command.

  • SSM Run Command 功能 免费, 且支持非常高的并发量.

  • SSM Run Command 可以捕获 Return Code, Stdout, Stderr, 使得我们可以满足上面的所有需求.

SSM Run Command 本身有一些限制.

  • 通过 API 发送的 Run Command 也是有限制的, 不能超过 100KB. 如果你需要发送大量数据, 那么你需要修改你的远程命令程序, 让它接受 S3 uri 为参数, 然后到 S3 uri 去读输入数据.

  • Stdout 是有大小限制的, API 最多显示 24000 个字符. 如果需要捕获大量数据, 那么你需要修改你的远程命令程序, 将结果保存在 S3 上.

这里我不详细展开说 SSM Run Command 这个功能, 建议先看看一下 Run Remote Command on EC2 via SSM 这边博文, 对其有个简单的了解

最终解决方案#

  1. 对于运行单条 Terminal Command, 就直接用 SSM Run Command 即可.

  2. 对于运行复杂的 Python 脚本呢, 我们可以将在本地的 Python 脚本先上传到 S3, 然后用 Run Command 运行第一条命令 aws s3 cp s3://... /tmp/...script.py 将其下载到 EC2 上, 然后再指定 Python 解释器来执行该脚本. 如果该脚本是个命令行工具, 我们还能带上参数. 注意, 我们要确保这个 EC2 上预装了 aws cli.

  3. 如果我们需要捕获命令返回的结果, 那么我们要么自己能保证这条命令能在 Stdout 中返回一个结构化的数据 (注意, logging 可能会干扰到返回值), 例如 JSON, 要么能运行过程中的数据上传到 S3. 然后我们再从 S3 读取数据.

实际案例#

script.py 这是我们想要在 EC2 上执行的命令. 我们会在后面的脚本中将其上传到 S3, 然后在 EC2 上下载并执行.

 1# -*- coding: utf-8 -*-
 2
 3"""
 4一个需要在 EC2 上运行的脚本, 它会打印一些包含特殊符号的字符串的 JSON 到 stdout.
 5"""
 6
 7import sys
 8import json
 9
10
11def run() -> dict:
12    print("start")
13    print("done")
14    return {
15        "python": sys.executable,
16        "weird_string": "\\a\nb\tc\"d'e@f#g:h/i"
17    }
18
19
20if __name__ == "__main__":
21    print(json.dumps(run()))

ssm_remote_command_helpers.py 这是一个库, 能让我们方便的调用 run command 命令

  1# -*- coding: utf-8 -*-
  2
  3"""
  4This module allow you to run remote command on EC2 instance via SSM in 'sync' mode.
  5The original ssm_client.send_command() is 'async' call, which means you have to
  6poll the status of the command execution via ssm_client.get_command_invocation().
  7This module hides the complexity of polling and provide a simple interface.
  8
  9Requirements:
 10
 11    func_args>=0.1.1,<1.0.0
 12
 13.. _send_command: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
 14.. _get_command_invocation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_command_invocation.html
 15"""
 16
 17import typing as T
 18import sys
 19import enum
 20import time
 21import itertools
 22import dataclasses
 23
 24from func_args import resolve_kwargs, NOTHING
 25
 26if T.TYPE_CHECKING:
 27    from mypy_boto3_ssm.client import SSMClient  # pip install "boto3_stubs[ssm]"
 28
 29
 30class Waiter:
 31    """
 32    Simple retry / polling with progressing status. Usage, it is common to check
 33    if a long-running job is done every X seconds and timeout in Y seconds.
 34    This class allow you to customize the polling interval and timeout,.
 35
 36    Example:
 37
 38    .. code-block:: python
 39
 40        print("before waiter")
 41
 42        for attempt, elapse in Waiter(
 43            delays=1,
 44            timeout=10,
 45            verbose=True,
 46        ):
 47            # check if should jump out of the polling loop
 48            if elapse >= 5:
 49                print("")
 50                break
 51
 52        print("after waiter")
 53    """
 54
 55    def __init__(
 56        self,
 57        delays: T.Union[int, float],
 58        timeout: T.Union[int, float],
 59        indent: int = 0,
 60        verbose: bool = True,
 61    ):
 62        self._delays = delays
 63        self.delays = itertools.repeat(delays)
 64        self.timeout = timeout
 65        self.tab = " " * indent
 66        self.verbose = verbose
 67
 68    def __iter__(self):
 69        if self.verbose:  # pragma: no cover
 70            sys.stdout.write(
 71                f"start waiter, polling every {self._delays} seconds, "
 72                f"timeout in {self.timeout} seconds.\n"
 73            )
 74            sys.stdout.flush()
 75            sys.stdout.write(
 76                f"\r{self.tab}on 0 th attempt, "
 77                f"elapsed 0 seconds, "
 78                f"remain {self.timeout} seconds ..."
 79            )
 80            sys.stdout.flush()
 81        start = time.time()
 82        end = start + self.timeout
 83        yield 0, 0
 84        for attempt, delay in enumerate(self.delays, 1):
 85            now = time.time()
 86            remaining = end - now
 87            if remaining < 0:
 88                raise TimeoutError(f"timed out in {self.timeout} seconds!")
 89            else:
 90                time.sleep(min(delay, remaining))
 91                elapsed = int(now - start + delay)
 92                if self.verbose:  # pragma: no cover
 93                    sys.stdout.write(
 94                        f"\r{self.tab}on {attempt} th attempt, "
 95                        f"elapsed {elapsed} seconds, "
 96                        f"remain {self.timeout - elapsed} seconds ..."
 97                    )
 98                    sys.stdout.flush()
 99                yield attempt, int(elapsed)
100
101
102def send_command(
103    ssm_client: "SSMClient",
104    instance_id: str,
105    commands: T.List[str],
106    comment: str = NOTHING,
107    output_s3_bucket_name: str = NOTHING,
108    output_s3_key_prefix: str = NOTHING,
109) -> str:
110    """
111    A simple wrapper of ``ssm_client.send_command``, execute sequence of commands
112    to one EC2 instance.
113
114    Reference:
115
116    - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
117    """
118    res = ssm_client.send_command(
119        **resolve_kwargs(
120            InstanceIds=[
121                instance_id,
122            ],
123            DocumentName="AWS-RunShellScript",
124            DocumentVersion="1",
125            Parameters={"commands": commands},
126            Comment=comment,
127            OutputS3BucketName=output_s3_bucket_name,
128            OutputS3KeyPrefix=output_s3_key_prefix,
129        )
130    )
131    command_id = res["Command"]["CommandId"]
132    return command_id
133
134
135class CommandInvocationStatusEnum(str, enum.Enum):
136    """
137    Reference:
138
139    - get_command_invocation_
140    """
141
142    Pending = "Pending"
143    InProgress = "InProgress"
144    Delayed = "Delayed"
145    Success = "Success"
146    Cancelled = "Cancelled"
147    TimedOut = "TimedOut"
148    Failed = "Failed"
149    Cancelling = "Cancelling"
150
151
152@dataclasses.dataclass
153class CommandInvocation:
154    """
155    Represents a Command Invocation details returned from a
156    get_command_invocation_ API call.
157    """
158
159    CommandId: T.Optional[str] = dataclasses.field(default=None)
160    InstanceId: T.Optional[str] = dataclasses.field(default=None)
161    Comment: T.Optional[str] = dataclasses.field(default=None)
162    DocumentName: T.Optional[str] = dataclasses.field(default=None)
163    DocumentVersion: T.Optional[str] = dataclasses.field(default=None)
164    PluginName: T.Optional[str] = dataclasses.field(default=None)
165    ResponseCode: T.Optional[int] = dataclasses.field(default=None)
166    ExecutionStartDateTime: T.Optional[str] = dataclasses.field(default=None)
167    ExecutionElapsedTime: T.Optional[str] = dataclasses.field(default=None)
168    ExecutionEndDateTime: T.Optional[str] = dataclasses.field(default=None)
169    Status: T.Optional[str] = dataclasses.field(default=None)
170    StatusDetails: T.Optional[str] = dataclasses.field(default=None)
171    StandardOutputContent: T.Optional[str] = dataclasses.field(default=None)
172    StandardOutputUrl: T.Optional[str] = dataclasses.field(default=None)
173    StandardErrorContent: T.Optional[str] = dataclasses.field(default=None)
174    StandardErrorUrl: T.Optional[str] = dataclasses.field(default=None)
175    CloudWatchOutputConfig: T.Optional[dict] = dataclasses.field(default=None)
176
177    @classmethod
178    def from_get_command_invocation_response(
179        cls,
180        response: dict,
181    ) -> "CommandInvocation":
182        """
183        Reference:
184
185        - get_command_invocation_
186        """
187        kwargs = {
188            field.name: response.get(field.name) for field in dataclasses.fields(cls)
189        }
190        return cls(**kwargs)
191
192    @classmethod
193    def get(
194        cls,
195        ssm_client: "SSMClient",
196        command_id: str,
197        instance_id: str,
198    ) -> "CommandInvocation":
199        """
200        A wrapper around get_command_invocation_ API call.
201
202        Reference:
203
204        - get_command_invocation_
205        """
206        response = ssm_client.get_command_invocation(
207            CommandId=command_id,
208            InstanceId=instance_id,
209        )
210        return cls.from_get_command_invocation_response(response)
211
212    def to_dict(self) -> dict:
213        return dataclasses.asdict(self)
214
215
216def wait_until_command_succeeded(
217    ssm_client: "SSMClient",
218    command_id: str,
219    instance_id: str,
220    delays: int = 3,
221    timeout: int = 60,
222    verbose: bool = True,
223) -> CommandInvocation:
224    """
225    After you call send_command_ API, you can use this function to wait until
226    it succeeds. If it fails, it will raise an exception.
227
228    Reference:
229
230    - get_command_invocation_
231    """
232    for _ in Waiter(delays=delays, timeout=timeout, verbose=verbose):
233        command_invocation = CommandInvocation.get(
234            ssm_client=ssm_client,
235            command_id=command_id,
236            instance_id=instance_id,
237        )
238        if command_invocation.Status == CommandInvocationStatusEnum.Success.value:
239            sys.stdout.write("\n")
240            return command_invocation
241        elif command_invocation.Status in [
242            CommandInvocationStatusEnum.Cancelled.value,
243            CommandInvocationStatusEnum.TimedOut.value,
244            CommandInvocationStatusEnum.Failed.value,
245            CommandInvocationStatusEnum.Cancelling.value,
246        ]:
247            raise Exception(f"Command failed, status: {command_invocation.Status}")
248        else:
249            pass

example.py 这是我们的最终代码, 实现了我们的解决方案.

  1# -*- coding: utf-8 -*-
  2
  3"""
  4Requirements::
  5
  6    pathlib_mate>=1.2.1,<2.0.0
  7    s3pathlib>=2.0.1,<3.0.0
  8    boto_session_manager>=1.5.1,<2.0.0
  9"""
 10
 11import typing as T
 12import time
 13import json
 14import uuid
 15
 16from pathlib_mate import Path
 17from s3pathlib import S3Path
 18from boto_session_manager import BotoSesManager
 19from rich import print as rprint
 20
 21# 从 ssm_remote_command_helpers.py 中导入我们需要的函数
 22from ssm_remote_command_helpers import (
 23    send_command,
 24    wait_until_command_succeeded,
 25)
 26
 27
 28def run(
 29    bsm: BotoSesManager,
 30    instance_id: str,
 31    path_python: Path,
 32    code: str,
 33    s3_path: S3Path,
 34    args: T.List[str],
 35):
 36    """
 37    这是我们解决方案的主函数, 对 ssm_remote_command_helpers.py 中的函数进行二次封装,
 38    它能自动将脚本通过 S3 上传到 EC2 上执行.
 39
 40    :param bsm: boto session manager 对象
 41    :param instance_id: EC2 instance id
 42    :param path_python: 位于 EC2 上的 Python 解释器路径, 你可以选择用哪个 Python 解释器来运行这个命令
 43    :param code: 你要在 EC2 上执行的脚本的源代码的字符串
 44    :param s3_path: 你要将这个源代码上传到 S3 的哪里
 45    :param args: 这个 Python 脚本有没有额外的参数, 如果有, 请用列表的形式列出来, 就像你
 46        写 subprocess.run([...]) 一样.
 47    """
 48    s3path.write_text(code)
 49
 50    # 生成一个随机的路径, 用于存放代码
 51    path_code = f"/tmp/{uuid.uuid4().hex}.py"
 52    # 用 aws cli 将代码下载到本地, 并且过滤掉日志
 53    command1 = f"/home/ubuntu/.pyenv/shims/aws s3 cp {s3_path.uri} {path_code} 2>&1 > /dev/null"
 54    # 组装最终命令
 55    args_ = [
 56        f"{path_python}",
 57        f"{path_code}",
 58    ]
 59    args_.extend(args)
 60    command2 = " ".join(args_)
 61    print(command1)
 62    print(command2)
 63    # 用 SSM 远程执行该命令
 64    command_id = send_command(
 65        ssm_client=bsm.ssm_client,
 66        instance_id=instance_id,
 67        commands=[
 68            command1,
 69            command2,
 70        ],
 71    )
 72    # 然后等待命令执行完毕
 73    time.sleep(1)  # 一定要等待 1 秒, 不然你立刻 get 是 get 不到的
 74    command_invocation = wait_until_command_succeeded(
 75        ssm_client=bsm.ssm_client,
 76        command_id=command_id,
 77        instance_id=instance_id,
 78    )
 79    rprint(command_invocation)
 80    # 解析 return code 和 standard output, parse 我们脚本输出的 JSON
 81    print(command_invocation.ResponseCode)
 82    lines = command_invocation.StandardOutputContent.splitlines()
 83    output_data = json.loads(lines[-1])
 84    rprint(output_data)
 85
 86
 87if __name__ == "__main__":
 88    bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
 89    instance_id = "i-00f591fc972902fc5"
 90    path_python = Path("/home/ubuntu/.pyenv/shims/python")
 91    code = Path("script.py").read_text()
 92    s3path = S3Path(
 93        f"s3://{bsm.aws_account_id}-{bsm.aws_region}-data/projects/dev-exp-share/script.py"
 94    )
 95    args = []
 96    run(
 97        bsm=bsm,
 98        instance_id=instance_id,
 99        path_python=path_python,
100        code=code,
101        s3_path=s3path,
102        args=[],
103    )