Working with log groups and log streams#
Keywords: AWS, Amazon, CloudWatch, CW, Log Group Stream
Create and Update Log Group and Stream#
对 Log group 的管理非常简单, 它只有 create_log_group 和配置 Retention policy put_retention_policy 两个操作.
对 Log stream 的管理更简单了, 压根没有其他参数.
下面是一个帮助你管理 Group 和 Stream 的模块. 比原生 API 的提升是加入了处理在 Create 时 Group 已经存在, 或是 Delete 的时 Group 不存在的情况. 并且提供了一个对 Log event 的数据模型抽象.
1# -*- coding: utf-8 -*-
2
3"""
4A helper module to work with CloudWatch Logs Group, Stream and put log events.
5
6Usage:
7
8.. code-block:: python
9
10 from recipe import (
11 get_log_group,
12 create_log_group,
13 delete_log_group,
14 get_log_stream,
15 create_log_stream,
16 delete_log_stream,
17 Event,
18 BaseJsonMessage,
19 put_log_events,
20 )
21"""
22
23import typing as T
24import json
25import dataclasses
26from datetime import datetime, timezone
27
28
29def get_log_group(
30 logs_client,
31 group_name: str,
32) -> T.Optional[dict]:
33 """
34 Get a log group details by name, if it doesn't exist, return None.
35
36 :return: A dict with the log group details, or None if it doesn't exist.
37 """
38 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_groups.html
39 res = logs_client.describe_log_groups(
40 logGroupNamePrefix=group_name,
41 )
42 groups = [
43 dct
44 for dct in res.get("logGroups", [])
45 if dct.get("logGroupName", "unknown-log-group-name") == group_name
46 ]
47 if len(groups):
48 return groups[0]
49 else:
50 return None
51
52
53def create_log_group(
54 logs_client,
55 group_name: str,
56) -> bool:
57 """
58 Create a log group, if it already exists, do nothing.
59
60 :return: True if the log group was created, False if it already existed.
61 """
62 group = get_log_group(logs_client, group_name)
63 if group is None:
64 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_group.html
65 logs_client.create_log_group(
66 logGroupName=group_name,
67 )
68 return True
69 else:
70 return False
71
72
73def delete_log_group(
74 logs_client,
75 group_name: str,
76) -> bool:
77 """
78 Delete a log group, if it doesn't exist, do nothing.
79
80 :return: True if the log group was deleted, False if it didn't exist.
81 """
82 group = get_log_group(logs_client, group_name)
83 if group is None:
84 return False
85 else:
86 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_group.html
87 logs_client.delete_log_group(
88 logGroupName=group_name,
89 )
90 return True
91
92
93def get_log_stream(
94 logs_client,
95 group_name: str,
96 stream_name: str,
97) -> T.Optional[dict]:
98 """
99 Get a log stream details by name, if it doesn't exist, return None.
100
101 :return: A dict with the log stream details, or None if it doesn't exist.
102 """
103 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_streams.html
104 res = logs_client.describe_log_streams(
105 logGroupName=group_name,
106 logStreamNamePrefix=stream_name,
107 )
108 streams = [
109 dct
110 for dct in res.get("logStreams", [])
111 if dct.get("logStreamName", "unknown-log-stream-name") == stream_name
112 ]
113 if len(streams):
114 return streams[0]
115 else:
116 return None
117
118
119def create_log_stream(
120 logs_client,
121 group_name: str,
122 stream_name: str,
123) -> bool:
124 """
125 Create a log stream, if it already exists, do nothing.
126
127 :return: True if the log stream was created, False if it already existed.
128 """
129 stream = get_log_stream(logs_client, group_name, stream_name)
130 if stream is None:
131 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_stream.html
132 logs_client.create_log_stream(
133 logGroupName=group_name,
134 logStreamName=stream_name,
135 )
136 return True
137 else:
138 return False
139
140
141def delete_log_stream(
142 logs_client,
143 group_name: str,
144 stream_name: str,
145) -> bool:
146 """
147 Delete a log stream, if it doesn't exist, do nothing.
148
149 :return: True if the log stream was deleted, False if it didn't exist.
150 """
151 stream = get_log_stream(logs_client, group_name, stream_name)
152 if stream is None:
153 return False
154 else:
155 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_stream.html
156 logs_client.delete_log_stream(
157 logGroupName=group_name,
158 logStreamName=stream_name,
159 )
160 return True
161
162
163EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
164
165
166def get_now_ts() -> int:
167 """
168 The put log events API expects a timestamp in milliseconds since epoch.
169 """
170 return int((datetime.utcnow().replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1000)
171
172
173@dataclasses.dataclass
174class Event:
175 """
176 Log event data model.
177 """
178
179 message: str = dataclasses.field()
180 timestamp: int = dataclasses.field(default_factory=get_now_ts)
181
182
183@dataclasses.dataclass
184class BaseJsonMessage:
185 """
186 Base class for json encoded log message.
187 """
188
189 def to_json(self) -> str:
190 """
191 Convert the object to a json string.
192
193 You can override this method to customize the json serialization.
194 """
195 return json.dumps(dataclasses.asdict(self))
196
197 @classmethod
198 def from_json(cls, json_str: str):
199 """
200 You can override this module to customize the json deserialization.
201 """
202 dct = json.loads(json_str)
203 return cls(**dct)
204
205
206def put_log_events(
207 logs_client,
208 group_name: str,
209 stream_name: str,
210 events: T.List[Event],
211) -> T.Optional[dict]:
212 """
213 Put a list of events into a log stream.
214
215 :param logs_client: The boto3 logs client.
216 :param group_name: The log group name.
217 :param stream_name: The log stream name.
218 :param events: A list of :class:`Event` objects.
219
220 :return: A dict with the response from the put_log_events call.
221 """
222 if len(events) == 0:
223 return None
224 res = logs_client.put_log_events(
225 logGroupName=group_name,
226 logStreamName=stream_name,
227 logEvents=[dataclasses.asdict(event) for event in events],
228 )
229 return res
Put Log Events#
我们可以用 put_log_events 的 API 来将 Log 打到 CloudWatch. 默认情况下这个 API 有两个限制:
每个 batch (一次 API) 的数据量不得超过 1MB.
对于 US East (N. Virginia), US West (Oregon), and Europe (Ireland), 一个 account / region 每秒最多 1500 Transaction, 对于其他 region 则是每秒 800 次.
下面给出了一段利用前面的模块写的往 Log Stream 里打 Log 的代码.
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import dataclasses
7
8from boto_session_manager import BotoSesManager
9from aws_console_url.api import AWSConsole
10
11from recipe import (
12 create_log_group,
13 delete_log_group,
14 create_log_stream,
15 Event,
16 BaseJsonMessage,
17 put_log_events,
18)
19
20bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
21aws = AWSConsole(aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm)
22log_client = bsm.cloudwatchlogs_client
23
24group_name = "learn_aws_cloudwatch/working_with_log_groups_and_log_streams"
25stream_name_1 = "container-1"
26stream_name_2 = "container-2"
27
28
29def set_up():
30 """
31 Set up cloudwatch logs resource for this example.
32 """
33 create_log_group(log_client, group_name)
34 create_log_stream(log_client, group_name, stream_name_1)
35 create_log_stream(log_client, group_name, stream_name_2)
36 print(aws.cloudwatch.get_log_group(group_name))
37
38
39@dataclasses.dataclass
40class StatusMessage(BaseJsonMessage):
41 server_id: str = dataclasses.field()
42 status: str = dataclasses.field()
43
44
45@dataclasses.dataclass
46class ProcessingTimeMessage(BaseJsonMessage):
47 server_id: str = dataclasses.field()
48 processing_time: int = dataclasses.field()
49
50
51server_id_list = [stream_name_1, stream_name_2]
52
53
54def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
55 """
56 70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
57 two messages, one for status and one for processing time. When failed, it will
58 generate one failed message for status.
59 """
60 server_id = random.choice(server_id_list)
61 stream_name = server_id
62 if random.randint(1, 100) <= 70:
63 messages = [
64 StatusMessage(
65 server_id=server_id,
66 status="succeeded",
67 ),
68 ProcessingTimeMessage(
69 server_id=server_id,
70 processing_time=random.randint(100, 1000),
71 ),
72 ]
73 else:
74 messages = [
75 StatusMessage(
76 server_id=server_id,
77 status="failed",
78 )
79 ]
80 put_log_events(
81 bsm.cloudwatchlogs_client,
82 group_name,
83 stream_name,
84 events=[Event(message=message.to_json()) for message in messages],
85 )
86 return messages
87
88
89def run_data_faker():
90 """
91 Run :func:`rand_event` every 1 second.
92
93 Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
94
95 The maximum batch size of a PutLogEvents request is 1MB.
96
97 **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
98 """
99 ith = 0
100 while True:
101 ith += 1
102 print(f"ith: {ith} sec")
103 time.sleep(1)
104 messages = rand_event()
105 for message in messages:
106 print(f" {message}")
107
108
109def clean_up():
110 """
111 Clearn up cloudwatch logs resource for this example.
112 """
113 delete_log_group(log_client, group_name)
114
115
116if __name__ == "__main__":
117 set_up()
118 run_data_faker()
119 # clean_up()