Run Remote Command on EC2 via SSM#

Keywords: AWS, Amazon, System, Systems, SSM

Overview#

在服务器上执行命令是一个非常普遍的需求. 通常我们有这么几种方法:

  • SSH 登录服务器, 然后在终端里敲命令.

  • 用远程执行工具, 例如 paramiko. 你需要管理好 SSH.

  • 用 Ansible 一类的自动化工具.

AWS 原生的 System Manager 服务可以用来来执行远程命令. 这种方法的好处有很多:

  • 无需管理 SSH.

  • 使用 IAM Role 权限管理, 非常安全.

  • 自动化程度高, 可以被嵌入或者编排成各种复杂的脚本.

  • 可以和 AWS 的其他服务联动.

本文我们就来看看如何用 AWS 的 System Manager 来执行远程命令.

How it Work#

AWS 有一个历史悠久的服务 SSM (System Manager), 该服务对标的是 Ansible 之类的服务器运维工具, 用于批量管理虚拟机. 和 Ansible 用 SSH 来执行远程命令的方式不同, SSM 是通过在机器上安装 SSM Agent (一个由 AWS 维护的系统服务软件), 然后让 SSM Agent 将自己自动注册到 SSM Fleet Manager, 然后通过 IAM 鉴权, 然后用 AWS 内部的 API 与 SSM Agent 通信从而执行远程命令.

我们来看一看在启动一台由 SSM 管理的 EC2 的过程中, 到底发生了什么:

  • 启动机器, 启动操作系统以及系统服务, 其中系统服务就包括 SSM agent.

  • SSM gent 启动后就会调用 IAM 的权限, 尝试将自己注册到 SSM Fleet Manager 上.

  • 一旦注册成功, 你就可以用 SSM 来远程操纵 EC2 了.

从以上内容我们可以看出来, 安装 SSM Agent 至关重要. 所幸的事 AWS 官方提供的一些 AMI (主要是 Amazon Linux) 上会预装 SSM Agent. 包括 AWS 认证过的第三方软件提供商例如 RedHat, Ubuntu 等公司提供的 AMI 也会预装 SSM Agent 并开机自动启动. 但是你用的是你自己或是 Market place 上的 AMI, 里面没有预装 SSM Agent, 你就需要自己安装了. 我们这个项目用的是 Ubuntu Server 20.04, 里面已经预装了 SSM Agent, 所以我们无需做任何额外工作.

在你启动 EC2 的时候 (包括启动新的 EC2, 或是 Stop 之后再 Start, 或是 Reboot 都可以, 因为只要启动系统服务就可以了), 只要你的 IAM Role 里有这个 由 AWS 管理的 IAM Policy arn:aws:iam::aws:policy/service-role/AmazonSSMManagedInstanceCore, 或是你创建一个自己的 Policy 有同样的权限, 那么 SSM Agent 就会自动将自己注册到 SSM Fleet Manager. 虽然 Reference 中的官方文档用的 IAM Role 有特定的名字, 但其实什么名字都可以, 只要有对应的权限就可以.

Reference:

Manually Install SSM Agent on EC2#

下面这些文档介绍了如何手动在 EC2 上安装 SSM Agent, 我并没有动手试过, 仅供参考.

一些有用的命令#

你可以用 AWS CLI 来查看哪些 EC2 被注册到了 SSM 管理清单上, 你到 SSM Fleet Manager Console 中看也是一样的:

aws ssm describe-instance-information --output text --profile bmt_app_dev_us_east_1

你也可以 SSH 到 EC2 上运行如下命令来检查 SSM Agent 是否已经启用 (该项目基于 ubuntu server 20.04, 其他系统请参考 官方文档):

sudo systemctl status snap.amazon-ssm-agent.amazon-ssm-agent.service

用 SSM Agent 执行远程命令#

下面这段代码展示了如何用 boto3 SDK 通过 SSM 运行远程命令.

 1# -*- coding: utf-8 -*-
 2
 3import boto3
 4
 5ssm_client = boto3.client("ssm")
 6
 7
 8def send_command(
 9    instance_id: str,
10    cmd: str,
11):
12    ssm_client.send_command(
13        InstanceIds=[
14            instance_id,
15        ],
16        DocumentName="AWS-RunShellScript",
17        DocumentVersion="1",
18        Parameters={
19            "commands": [
20                cmd,
21            ]
22        },
23    )
24
25
26send_command(
27    instance_id="i-1a2b3c4d",
28    cmd="echo 1a2b3c4d > ~/chore",
29)

有了概念之后, 我们来看一个更高级的模块, 适用于生产环境的代码. 与前面的例子不同的是, 它加入了 waiter, 能用 sync 的方式等待 command 完成, 并且 command 的 output 会被写入到 S3 中持久化:

  1# -*- coding: utf-8 -*-
  2
  3"""
  4This module allow you to run remote command on EC2 instance via SSM in 'sync' mode.
  5The original ssm_client.send_command() is 'async' call, which means you have to
  6poll the status of the command execution via ssm_client.get_command_invocation().
  7This module hides the complexity of polling and provide a simple interface.
  8
  9Requirements:
 10
 11    func_args>=0.1.1,<1.0.0
 12
 13Example:
 14
 15.. code-block:: python
 16
 17    import boto3
 18    from s3pathlib import S3Path
 19
 20    instance_id = "i-1a2b3c"
 21    commands = [
 22        "echo hello"
 23    ]
 24    ssm_client = boto3.client("ssm")
 25
 26    # make sure your EC2 has the IAM permission to write to this location
 27    s3dir_command_output = S3Path(f"s3://my-bucket/ssm-command-output/").to_dir()
 28
 29    res = ssm_client.send_command(
 30        InstanceIds=[instance_id],
 31        DocumentName="AWS-RunShellScript",
 32        DocumentVersion="1",
 33        Parameters={
 34            "commands": commands
 35        },
 36        OutputS3BucketName=s3dir_command_output.bucket,
 37        OutputS3KeyPrefix=s3dir_command_output.key,
 38    )
 39    command_id = res["Command"]["CommandId"]
 40
 41    wait_until_command_succeeded(
 42        ssm_client=ssm_client,
 43        command_id=command_id,
 44        instance_id=instance_id,
 45        delays=3,
 46        timeout=60,
 47        verbose=True,
 48    )
 49
 50    for s3path in (
 51        s3dir_command_output.joinpath(
 52            command_id,
 53            instance_id,
 54            "awsrunShellScript",
 55        )
 56        .to_dir()
 57        .iter_objects()
 58    ):
 59        print(f"--- {s3path.uri} ---")
 60        print(f"{s3path.read_text()}")
 61
 62.. _send_command: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
 63.. _get_command_invocation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_command_invocation.html
 64"""
 65
 66import typing as T
 67import sys
 68import enum
 69import time
 70import itertools
 71import dataclasses
 72
 73from func_args import resolve_kwargs, NOTHING
 74
 75if T.TYPE_CHECKING:
 76    from mypy_boto3_ssm.client import SSMClient  # pip install "boto3_stubs[ssm]"
 77
 78
 79class Waiter:
 80    """
 81    Simple retry / poll with progress.
 82    """
 83
 84    def __init__(
 85        self,
 86        delays: T.Union[int, float],
 87        timeout: T.Union[int, float],
 88        indent: int = 0,
 89        verbose: bool = True,
 90    ):
 91        self.delays = itertools.repeat(delays)
 92        self.timeout = timeout
 93        self.tab = " " * indent
 94        self.verbose = verbose
 95
 96    def __iter__(self):
 97        start = time.time()
 98        end = start + self.timeout
 99        for attempt, delay in enumerate(self.delays, 1):
100            now = time.time()
101            remaining = end - now
102            if remaining < 0:
103                raise TimeoutError(f"timed out in {self.timeout} seconds!")
104            else:
105                time.sleep(min(delay, remaining))
106                elapsed = int(now - start + delay)
107                if self.verbose:
108                    sys.stdout.write(
109                        f"\r{self.tab}on {attempt} th attempt, "
110                        f"elapsed {elapsed} seconds, "
111                        f"remain {self.timeout - elapsed} seconds ..."
112                    )
113                    sys.stdout.flush()
114                yield attempt, int(elapsed)
115
116
117class CommandInvocationStatusEnum(str, enum.Enum):
118    Pending = "Pending"
119    InProgress = "InProgress"
120    Delayed = "Delayed"
121    Success = "Success"
122    Cancelled = "Cancelled"
123    TimedOut = "TimedOut"
124    Failed = "Failed"
125    Cancelling = "Cancelling"
126
127
128@dataclasses.dataclass
129class CommandInvocation:
130    """
131    Reference:
132
133    - get_command_invocation_
134    """
135
136    CommandId: T.Optional[str] = dataclasses.field(default=None)
137    InstanceId: T.Optional[str] = dataclasses.field(default=None)
138    Comment: T.Optional[str] = dataclasses.field(default=None)
139    DocumentName: T.Optional[str] = dataclasses.field(default=None)
140    DocumentVersion: T.Optional[str] = dataclasses.field(default=None)
141    PluginName: T.Optional[str] = dataclasses.field(default=None)
142    ResponseCode: T.Optional[int] = dataclasses.field(default=None)
143    ExecutionStartDateTime: T.Optional[str] = dataclasses.field(default=None)
144    ExecutionElapsedTime: T.Optional[str] = dataclasses.field(default=None)
145    ExecutionEndDateTime: T.Optional[str] = dataclasses.field(default=None)
146    Status: T.Optional[str] = dataclasses.field(default=None)
147    StatusDetails: T.Optional[str] = dataclasses.field(default=None)
148    StandardOutputContent: T.Optional[str] = dataclasses.field(default=None)
149    StandardOutputUrl: T.Optional[str] = dataclasses.field(default=None)
150    StandardErrorContent: T.Optional[str] = dataclasses.field(default=None)
151    StandardErrorUrl: T.Optional[str] = dataclasses.field(default=None)
152    CloudWatchOutputConfig: T.Optional[dict] = dataclasses.field(default=None)
153
154    @classmethod
155    def from_get_command_invocation_response(
156        cls, response: dict
157    ) -> "CommandInvocation":
158        """
159        Reference:
160
161        - get_command_invocation_
162        """
163        kwargs = {
164            field.name: response.get(field.name) for field in dataclasses.fields(cls)
165        }
166        return cls(**kwargs)
167
168    @classmethod
169    def get(
170        cls,
171        ssm_client: "SSMClient",
172        command_id: str,
173        instance_id: str,
174        plugin_name: T.Optional[str] = NOTHING,
175    ) -> "CommandInvocation":
176        """
177        Reference:
178
179        - get_command_invocation_
180        """
181        response = ssm_client.get_command_invocation(
182            **resolve_kwargs(
183                CommandId=command_id,
184                InstanceId=instance_id,
185                PluginName=plugin_name,
186            )
187        )
188        return cls.from_get_command_invocation_response(response)
189
190
191def wait_until_command_succeeded(
192    ssm_client: "SSMClient",
193    command_id: str,
194    instance_id: str,
195    plugin_name: T.Optional[str] = NOTHING,
196    delays: int = 3,
197    timeout: int = 60,
198    verbose: bool = True,
199):
200    """
201    Reference:
202
203    - get_command_invocation_
204    """
205    for _ in Waiter(delays=delays, timeout=timeout, verbose=verbose):
206        command_invocation = CommandInvocation.get(
207            ssm_client=ssm_client,
208            command_id=command_id,
209            instance_id=instance_id,
210            plugin_name=plugin_name,
211        )
212        if command_invocation.Status == CommandInvocationStatusEnum.Success.value:
213            if verbose:
214                print("")
215            break
216        elif command_invocation.Status in [
217            CommandInvocationStatusEnum.Cancelled.value,
218            CommandInvocationStatusEnum.TimedOut.value,
219            CommandInvocationStatusEnum.Failed.value,
220            CommandInvocationStatusEnum.Cancelling.value,
221        ]:
222            raise Exception(f"Command failed, status: {command_invocation.Status}")
223        else:
224            pass
225
226
227if __name__ == "__main__":
228    import boto3
229    from s3pathlib import S3Path
230
231    instance_id = "i-1a2b3c"
232    commands = ["echo hello"]
233    ssm_client = boto3.client("ssm")
234
235    # make sure your EC2 has the IAM permission to write to this location
236    s3dir_command_output = S3Path(f"s3://my-bucket/ssm-command-output/").to_dir()
237
238    # run remote command, this is an async operation
239    res = ssm_client.send_command(
240        InstanceIds=[instance_id],
241        DocumentName="AWS-RunShellScript",
242        DocumentVersion="1",
243        Parameters={"commands": commands},
244        # store the command output to S3
245        OutputS3BucketName=s3dir_command_output.bucket,
246        OutputS3KeyPrefix=s3dir_command_output.key,
247    )
248    command_id = res["Command"]["CommandId"]
249
250    # wait until the command succeeds
251    wait_until_command_succeeded(
252        ssm_client=ssm_client,
253        command_id=command_id,
254        instance_id=instance_id,
255        delays=3,
256        timeout=60,
257        verbose=True,
258    )
259
260    # print the command output
261    for s3path in (
262        s3dir_command_output.joinpath(
263            command_id,
264            instance_id,
265            "awsrunShellScript",
266        )
267        .to_dir()
268        .iter_objects()
269    ):
270        print(f"--- {s3path.uri} ---")
271        print(f"{s3path.read_text()}")

Reference:

总结#

  1. 在创建 EC2 之前就要配置好你的 IAM Role.

  2. 确保你给 EC2 的 IAM Role 有这个 AmazonSSMManagedInstanceCore IAM Policy.

  3. 启动 EC2 的时候使用这个 IAM Role. 如果启动的时候忘记给 IAM Role, 那么你可以启动后指定 IAM Role 然后重启即可.

  4. 然后就可以用 SSM 的 API 来远程执行命令了.

Remote Command 还能用来干什么#

很多自动化脚本由于网络连接的缘故是必须要在 EC2 上运行的. 所以我们可以在世界的任意地点用 SSM agent 来执行远程命令. 而而关于传输数据, 我建议通过 S3 做媒介, 让 EC2 将命令执行后的数据写入到 S3 上. 这样你就可以在任意地点读取这些数据了.