Run Remote Command on Ec2 Ultimate Solution#
Keywords: AWS, EC2, System, Systems, Manager, SSM, Python, Remote, Command
我们想要做什么#
出于多种原因, 往往是网络相关的原因, 很多时候代码必须要在 EC2 环境内执行. 而作为开发者, 如何像在本地电脑上执行 Python 自动化脚本一样在 EC2 环境内执行命令呢? 如果能做到这一点, 想象空间可以是无限的. 下面我们详细的拆解一下需求:
从具体执行的命令复杂度来看, 可以分为两类:
单条在 Terminal 内的命令. 例如
aws s3 ls
.以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.
从对反馈的要求来看, 可以分为三类:
我只需要执行, 不需要反馈.
我需要知道执行的返回码是成功 (0) 还是失败 (非 0).
我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据.
从命令的发起者来看,
只需要我的开发电脑能发起命令即可.
这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.
可以看出, 以上需求可以排列组合, 从而出现 2 * 3 * 2 = 12 种情况. 有没有一种解决方案能够同时满足这 12 种情况呢? 答案是肯定的, 我们将在下面的章节中详细的介绍.
探索可能的解决方案#
我们对上面的需求来一条一条的分析, 看看这些需求后面的本质.
单条在 Terminal 内的命令. 例如
aws s3 ls
.这个没什么说的, 就是一条远程命令.
以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.
这就意味着我们总得有一个简单, 可重复, 安全的方法将任意脚本上传到 EC2 环境内.
我只需要执行, 不需要反馈
这个没什么说的, 简单执行即可.
我需要知道执行的返回码是成功 (0) 还是失败 (非 0)
这就需要我们能捕获错误码 (return code)
我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据
要么这个命令本身的设计就是会把返回数据写到 stdout, 那么我们只要能捕获 stdout 即可. 要么在运行时将数据上传到一个中间媒介, 例如 S3, 然后我们再从 S3 读取数据.
只需要我的开发电脑能发起命令即可
要么我的电脑能 SSH 到 EC2 上去. 要么我的电脑有一些相关的 AWS 权限. 这里的权限主要指的是 AWS System Manager Run Command 的权限. 这是一个 AWS 托管的服务器, 可以利用 SSM Agent 在 EC2 上执行任何命令.
这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.
这个发起方只要有上面说的 AWS System Manager Run Command 权限即可. 当然开发电脑也可以有这个权限.
好了, 我们现在对解决每一条需求都有个大概的概念了, 下一步我们来将这些方案组合成一个完整的解决方案. 但在这之前, 我们先来了解一下这里的核心技术 AWS SSM Run Command.
AWS SSM Run Command#
AWS System Manager 是一个历史悠久的 AWS 服务, 主要用于批量管理 EC2 instance 虚拟机. 你可以将其理解为 AWS 版本的 Ansible. 而它的核心组件就是 System Manager Agent (SSM Agent), 本质上是一个服务端软件, 安装在 EC2 机器上, 属于系统服务的一部分. 而 AWS 内部对 EC2 的管理工作很多都是通过 SSM Agent 来进行的. 而”Run Command” 则是 SSM 的一项功能, 可以通过 SSM Agent 执行远程命令.
简单来说我们选择 SSM Run Command 作为我们解决方案的核心技术是出于以下几点考量:
SSM Run Command 是受 IAM Role 权限保护的, 非常安全且灵活, 兼容于各种 AWS 服务, 使得我们可以在任何 AWS 服务内发起 SSM Run Command.
SSM Run Command 功能 免费, 且支持非常高的并发量.
SSM Run Command 可以捕获 Return Code, Stdout, Stderr, 使得我们可以满足上面的所有需求.
SSM Run Command 本身有一些限制.
通过 API 发送的 Run Command 也是有限制的, 不能超过 100KB. 如果你需要发送大量数据, 那么你需要修改你的远程命令程序, 让它接受 S3 uri 为参数, 然后到 S3 uri 去读输入数据.
Stdout 是有大小限制的, API 最多显示 24000 个字符. 如果需要捕获大量数据, 那么你需要修改你的远程命令程序, 将结果保存在 S3 上.
这里我不详细展开说 SSM Run Command 这个功能, 建议先看看一下 Run Remote Command on EC2 via SSM 这边博文, 对其有个简单的了解
最终解决方案#
对于运行单条 Terminal Command, 就直接用 SSM Run Command 即可.
对于运行复杂的 Python 脚本呢, 我们可以将在本地的 Python 脚本先上传到 S3, 然后用 Run Command 运行第一条命令
aws s3 cp s3://... /tmp/...script.py
将其下载到 EC2 上, 然后再指定 Python 解释器来执行该脚本. 如果该脚本是个命令行工具, 我们还能带上参数. 注意, 我们要确保这个 EC2 上预装了 aws cli.如果我们需要捕获命令返回的结果, 那么我们要么自己能保证这条命令能在 Stdout 中返回一个结构化的数据 (注意, logging 可能会干扰到返回值), 例如 JSON, 要么能运行过程中的数据上传到 S3. 然后我们再从 S3 读取数据.
实际案例#
script.py
这是我们想要在 EC2 上执行的命令. 我们会在后面的脚本中将其上传到 S3, 然后在 EC2 上下载并执行.
1# -*- coding: utf-8 -*-
2
3"""
4一个需要在 EC2 上运行的脚本, 它会打印一些包含特殊符号的字符串的 JSON 到 stdout.
5"""
6
7import sys
8import json
9
10
11def run() -> dict:
12 print("start")
13 print("done")
14 return {
15 "python": sys.executable,
16 "weird_string": "\\a\nb\tc\"d'e@f#g:h/i"
17 }
18
19
20if __name__ == "__main__":
21 print(json.dumps(run()))
ssm_remote_command_helpers.py
这是一个库, 能让我们方便的调用 run command 命令
1# -*- coding: utf-8 -*-
2
3"""
4This module allow you to run remote command on EC2 instance via SSM in 'sync' mode.
5The original ssm_client.send_command() is 'async' call, which means you have to
6poll the status of the command execution via ssm_client.get_command_invocation().
7This module hides the complexity of polling and provide a simple interface.
8
9Requirements:
10
11 func_args>=0.1.1,<1.0.0
12
13.. _send_command: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
14.. _get_command_invocation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_command_invocation.html
15"""
16
17import typing as T
18import sys
19import enum
20import time
21import itertools
22import dataclasses
23
24from func_args import resolve_kwargs, NOTHING
25
26if T.TYPE_CHECKING:
27 from mypy_boto3_ssm.client import SSMClient # pip install "boto3_stubs[ssm]"
28
29
30class Waiter:
31 """
32 Simple retry / polling with progressing status. Usage, it is common to check
33 if a long-running job is done every X seconds and timeout in Y seconds.
34 This class allow you to customize the polling interval and timeout,.
35
36 Example:
37
38 .. code-block:: python
39
40 print("before waiter")
41
42 for attempt, elapse in Waiter(
43 delays=1,
44 timeout=10,
45 verbose=True,
46 ):
47 # check if should jump out of the polling loop
48 if elapse >= 5:
49 print("")
50 break
51
52 print("after waiter")
53 """
54
55 def __init__(
56 self,
57 delays: T.Union[int, float],
58 timeout: T.Union[int, float],
59 indent: int = 0,
60 verbose: bool = True,
61 ):
62 self._delays = delays
63 self.delays = itertools.repeat(delays)
64 self.timeout = timeout
65 self.tab = " " * indent
66 self.verbose = verbose
67
68 def __iter__(self):
69 if self.verbose: # pragma: no cover
70 sys.stdout.write(
71 f"start waiter, polling every {self._delays} seconds, "
72 f"timeout in {self.timeout} seconds.\n"
73 )
74 sys.stdout.flush()
75 sys.stdout.write(
76 f"\r{self.tab}on 0 th attempt, "
77 f"elapsed 0 seconds, "
78 f"remain {self.timeout} seconds ..."
79 )
80 sys.stdout.flush()
81 start = time.time()
82 end = start + self.timeout
83 yield 0, 0
84 for attempt, delay in enumerate(self.delays, 1):
85 now = time.time()
86 remaining = end - now
87 if remaining < 0:
88 raise TimeoutError(f"timed out in {self.timeout} seconds!")
89 else:
90 time.sleep(min(delay, remaining))
91 elapsed = int(now - start + delay)
92 if self.verbose: # pragma: no cover
93 sys.stdout.write(
94 f"\r{self.tab}on {attempt} th attempt, "
95 f"elapsed {elapsed} seconds, "
96 f"remain {self.timeout - elapsed} seconds ..."
97 )
98 sys.stdout.flush()
99 yield attempt, int(elapsed)
100
101
102def send_command(
103 ssm_client: "SSMClient",
104 instance_id: str,
105 commands: T.List[str],
106 comment: str = NOTHING,
107 output_s3_bucket_name: str = NOTHING,
108 output_s3_key_prefix: str = NOTHING,
109) -> str:
110 """
111 A simple wrapper of ``ssm_client.send_command``, execute sequence of commands
112 to one EC2 instance.
113
114 Reference:
115
116 - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
117 """
118 res = ssm_client.send_command(
119 **resolve_kwargs(
120 InstanceIds=[
121 instance_id,
122 ],
123 DocumentName="AWS-RunShellScript",
124 DocumentVersion="1",
125 Parameters={"commands": commands},
126 Comment=comment,
127 OutputS3BucketName=output_s3_bucket_name,
128 OutputS3KeyPrefix=output_s3_key_prefix,
129 )
130 )
131 command_id = res["Command"]["CommandId"]
132 return command_id
133
134
135class CommandInvocationStatusEnum(str, enum.Enum):
136 """
137 Reference:
138
139 - get_command_invocation_
140 """
141
142 Pending = "Pending"
143 InProgress = "InProgress"
144 Delayed = "Delayed"
145 Success = "Success"
146 Cancelled = "Cancelled"
147 TimedOut = "TimedOut"
148 Failed = "Failed"
149 Cancelling = "Cancelling"
150
151
152@dataclasses.dataclass
153class CommandInvocation:
154 """
155 Represents a Command Invocation details returned from a
156 get_command_invocation_ API call.
157 """
158
159 CommandId: T.Optional[str] = dataclasses.field(default=None)
160 InstanceId: T.Optional[str] = dataclasses.field(default=None)
161 Comment: T.Optional[str] = dataclasses.field(default=None)
162 DocumentName: T.Optional[str] = dataclasses.field(default=None)
163 DocumentVersion: T.Optional[str] = dataclasses.field(default=None)
164 PluginName: T.Optional[str] = dataclasses.field(default=None)
165 ResponseCode: T.Optional[int] = dataclasses.field(default=None)
166 ExecutionStartDateTime: T.Optional[str] = dataclasses.field(default=None)
167 ExecutionElapsedTime: T.Optional[str] = dataclasses.field(default=None)
168 ExecutionEndDateTime: T.Optional[str] = dataclasses.field(default=None)
169 Status: T.Optional[str] = dataclasses.field(default=None)
170 StatusDetails: T.Optional[str] = dataclasses.field(default=None)
171 StandardOutputContent: T.Optional[str] = dataclasses.field(default=None)
172 StandardOutputUrl: T.Optional[str] = dataclasses.field(default=None)
173 StandardErrorContent: T.Optional[str] = dataclasses.field(default=None)
174 StandardErrorUrl: T.Optional[str] = dataclasses.field(default=None)
175 CloudWatchOutputConfig: T.Optional[dict] = dataclasses.field(default=None)
176
177 @classmethod
178 def from_get_command_invocation_response(
179 cls,
180 response: dict,
181 ) -> "CommandInvocation":
182 """
183 Reference:
184
185 - get_command_invocation_
186 """
187 kwargs = {
188 field.name: response.get(field.name) for field in dataclasses.fields(cls)
189 }
190 return cls(**kwargs)
191
192 @classmethod
193 def get(
194 cls,
195 ssm_client: "SSMClient",
196 command_id: str,
197 instance_id: str,
198 ) -> "CommandInvocation":
199 """
200 A wrapper around get_command_invocation_ API call.
201
202 Reference:
203
204 - get_command_invocation_
205 """
206 response = ssm_client.get_command_invocation(
207 CommandId=command_id,
208 InstanceId=instance_id,
209 )
210 return cls.from_get_command_invocation_response(response)
211
212 def to_dict(self) -> dict:
213 return dataclasses.asdict(self)
214
215
216def wait_until_command_succeeded(
217 ssm_client: "SSMClient",
218 command_id: str,
219 instance_id: str,
220 delays: int = 3,
221 timeout: int = 60,
222 verbose: bool = True,
223) -> CommandInvocation:
224 """
225 After you call send_command_ API, you can use this function to wait until
226 it succeeds. If it fails, it will raise an exception.
227
228 Reference:
229
230 - get_command_invocation_
231 """
232 for _ in Waiter(delays=delays, timeout=timeout, verbose=verbose):
233 command_invocation = CommandInvocation.get(
234 ssm_client=ssm_client,
235 command_id=command_id,
236 instance_id=instance_id,
237 )
238 if command_invocation.Status == CommandInvocationStatusEnum.Success.value:
239 sys.stdout.write("\n")
240 return command_invocation
241 elif command_invocation.Status in [
242 CommandInvocationStatusEnum.Cancelled.value,
243 CommandInvocationStatusEnum.TimedOut.value,
244 CommandInvocationStatusEnum.Failed.value,
245 CommandInvocationStatusEnum.Cancelling.value,
246 ]:
247 raise Exception(f"Command failed, status: {command_invocation.Status}")
248 else:
249 pass
example.py
这是我们的最终代码, 实现了我们的解决方案.
1# -*- coding: utf-8 -*-
2
3"""
4Requirements::
5
6 pathlib_mate>=1.2.1,<2.0.0
7 s3pathlib>=2.0.1,<3.0.0
8 boto_session_manager>=1.5.1,<2.0.0
9"""
10
11import typing as T
12import time
13import json
14import uuid
15
16from pathlib_mate import Path
17from s3pathlib import S3Path
18from boto_session_manager import BotoSesManager
19from rich import print as rprint
20
21# 从 ssm_remote_command_helpers.py 中导入我们需要的函数
22from ssm_remote_command_helpers import (
23 send_command,
24 wait_until_command_succeeded,
25)
26
27
28def run(
29 bsm: BotoSesManager,
30 instance_id: str,
31 path_python: Path,
32 code: str,
33 s3_path: S3Path,
34 args: T.List[str],
35):
36 """
37 这是我们解决方案的主函数, 对 ssm_remote_command_helpers.py 中的函数进行二次封装,
38 它能自动将脚本通过 S3 上传到 EC2 上执行.
39
40 :param bsm: boto session manager 对象
41 :param instance_id: EC2 instance id
42 :param path_python: 位于 EC2 上的 Python 解释器路径, 你可以选择用哪个 Python 解释器来运行这个命令
43 :param code: 你要在 EC2 上执行的脚本的源代码的字符串
44 :param s3_path: 你要将这个源代码上传到 S3 的哪里
45 :param args: 这个 Python 脚本有没有额外的参数, 如果有, 请用列表的形式列出来, 就像你
46 写 subprocess.run([...]) 一样.
47 """
48 s3path.write_text(code)
49
50 # 生成一个随机的路径, 用于存放代码
51 path_code = f"/tmp/{uuid.uuid4().hex}.py"
52 # 用 aws cli 将代码下载到本地, 并且过滤掉日志
53 command1 = f"/home/ubuntu/.pyenv/shims/aws s3 cp {s3_path.uri} {path_code} 2>&1 > /dev/null"
54 # 组装最终命令
55 args_ = [
56 f"{path_python}",
57 f"{path_code}",
58 ]
59 args_.extend(args)
60 command2 = " ".join(args_)
61 print(command1)
62 print(command2)
63 # 用 SSM 远程执行该命令
64 command_id = send_command(
65 ssm_client=bsm.ssm_client,
66 instance_id=instance_id,
67 commands=[
68 command1,
69 command2,
70 ],
71 )
72 # 然后等待命令执行完毕
73 time.sleep(1) # 一定要等待 1 秒, 不然你立刻 get 是 get 不到的
74 command_invocation = wait_until_command_succeeded(
75 ssm_client=bsm.ssm_client,
76 command_id=command_id,
77 instance_id=instance_id,
78 )
79 rprint(command_invocation)
80 # 解析 return code 和 standard output, parse 我们脚本输出的 JSON
81 print(command_invocation.ResponseCode)
82 lines = command_invocation.StandardOutputContent.splitlines()
83 output_data = json.loads(lines[-1])
84 rprint(output_data)
85
86
87if __name__ == "__main__":
88 bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
89 instance_id = "i-00f591fc972902fc5"
90 path_python = Path("/home/ubuntu/.pyenv/shims/python")
91 code = Path("script.py").read_text()
92 s3path = S3Path(
93 f"s3://{bsm.aws_account_id}-{bsm.aws_region}-data/projects/dev-exp-share/script.py"
94 )
95 args = []
96 run(
97 bsm=bsm,
98 instance_id=instance_id,
99 path_python=path_python,
100 code=code,
101 s3_path=s3path,
102 args=[],
103 )