Analyzing log data with CloudWatch Logs Insights#
Keywords: AWS, Amazon, CloudWatch, CW, Log, Logs, Insight, Insights
How do we learn CloudWatch Logs Insights#
最好的学习方法永远是实践. 所以我们准备了一个脚本可以轻松的创建 Log Group / Stream 并且往里面每秒打一个自定义的 Log.
我们还有个函数可以轻松地清除这些资源. 一旦我们 run 了 delete_log_group
API, 指定的 Log Group 以及下面的所有 Log Stream 的数据就都会被删除. 我们可以立刻开始一个新的实验.
Query Language Quick Start#
CloudWatch Logs Insights (下面简称 Insights) 的查询语言是一个类 SQL 的语言. 它的语法包含这么几个部分:
选择, 定义你想要选择哪些字段. 这里包括类似于 SQL 中的
SELECT
的用来选择字段的 fields 关键字, 以及用来创建图表的 stats 关键字.筛选, 定义你想要筛选哪些数据. 这里包括类似于 SQL 中的
WHERE
的用来筛选数据的 filter 关键字.提取数据, 由于 Log 数据并不像 SQL 中的都是严格结构化的数据, 所以你有时候需要从 Log 中提取结构化的数据. 这里的关键字是 parse.
和 SQL 一样, Insights 也有 limit 关键字可以限制返回的数据条数, 以及 sort 关键字可以按照指定的字段排序.
当然以上几个只是核心功能, Insights 还有一些针对 Log 的高阶语法. 我们先不急着展开讲, 之后我们会慢慢学到的.
下面这条是一个最基础的例子, 类似于 SQL 中的 SELECT * FROM TABLE LIMIT 10
. 其中 @timestamp
是 log 的时间戳, @message
是 log 的内容``. @logStream
和 @log
分别是 Stream 和 Group. 带 @
的都是 CloudWatch log 中的特殊字段:
fields @timestamp, @message, @logStream, @log
| sort @timestamp desc
| limit 20
如果你的 Log Message 的结构是 {"server_id": "container-1", "processing_time": 500}
. 那么你可以直接用 JSON Dot Notation 来选择字段:
fields @timestamp, @message, server_id, processing_time
| sort @timestamp desc
| limit 20
当然在 Filter 中的条件也可以使用 JSON Dot Notation:
fields @timestamp, @message, server_id, processing_time
| filter server_id = "container-1"
| sort @timestamp desc
| limit 20
这里有个小知识点, 如果你要对 timestamp 进行 filter 时, 它不支持 human readable format, 你需要自行将其转化为 millisecond 的 timestamp. 而且你转换的时候一定要注意时区, 否则你的结果可能会出现偏差:
fields @timestamp, @message, server_id, processing_time
| filter @timestamp <= 1699797262424
| sort @timestamp desc
| limit 20
如果你的 filter 的条件有多个, 你可以用 逻辑运算符 and
, or
, not
来连接它们. 就跟在 SQL 中的 WHERE col1 = value1 and col2 = value2
一样.
Pattern#
Pattern 是一个很强大的函数, 它可以对你的 Log 进行采样, 然后分析出来有哪些 Pattern. 例如我们的测试数据中有两种不同模式的 JSON:
{"server_id": "container-1", "status": "succeeded"}
{"server_id": "container-1", "processing_time": 2000}
那么 pattern 这个函数就可以自动分析出这两种 pattern 的 regex:
fields @timestamp, @message
| pattern @message
Playbook#
这里我们提供了几个 Python 模块, 用于方便地创建 fake data, 以及测试不同的 query.
recipe.py
一些能使得代码更精炼的模块.
1# -*- coding: utf-8 -*-
2
3"""
4A helper module to work with CloudWatch Logs Group, Stream and put log events.
5
6Usage:
7
8.. code-block:: python
9
10 from recipe import (
11 get_log_group,
12 create_log_group,
13 delete_log_group,
14 get_log_stream,
15 create_log_stream,
16 delete_log_stream,
17 Event,
18 BaseJsonMessage,
19 put_log_events,
20 get_ts_in_second,
21 get_ts_in_millisecond,
22 QueryStatusEnum,
23 wait_logs_insights_query_to_succeed,
24 run_query,
25 reformat_query_results,
26 )
27"""
28
29import typing as T
30import time
31import json
32import enum
33import dataclasses
34from datetime import datetime, timezone
35
36
37def get_log_group(
38 logs_client,
39 group_name: str,
40) -> T.Optional[dict]:
41 """
42 Get a log group details by name, if it doesn't exist, return None.
43
44 :return: A dict with the log group details, or None if it doesn't exist.
45 """
46 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_groups.html
47 res = logs_client.describe_log_groups(
48 logGroupNamePrefix=group_name,
49 )
50 groups = [
51 dct
52 for dct in res.get("logGroups", [])
53 if dct.get("logGroupName", "unknown-log-group-name") == group_name
54 ]
55 if len(groups):
56 return groups[0]
57 else:
58 return None
59
60
61def create_log_group(
62 logs_client,
63 group_name: str,
64) -> bool:
65 """
66 Create a log group, if it already exists, do nothing.
67
68 :return: True if the log group was created, False if it already existed.
69 """
70 group = get_log_group(logs_client, group_name)
71 if group is None:
72 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_group.html
73 logs_client.create_log_group(
74 logGroupName=group_name,
75 )
76 return True
77 else:
78 return False
79
80
81def delete_log_group(
82 logs_client,
83 group_name: str,
84) -> bool:
85 """
86 Delete a log group, if it doesn't exist, do nothing.
87
88 :return: True if the log group was deleted, False if it didn't exist.
89 """
90 group = get_log_group(logs_client, group_name)
91 if group is None:
92 return False
93 else:
94 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_group.html
95 logs_client.delete_log_group(
96 logGroupName=group_name,
97 )
98 return True
99
100
101def get_log_stream(
102 logs_client,
103 group_name: str,
104 stream_name: str,
105) -> T.Optional[dict]:
106 """
107 Get a log stream details by name, if it doesn't exist, return None.
108
109 :return: A dict with the log stream details, or None if it doesn't exist.
110 """
111 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_streams.html
112 res = logs_client.describe_log_streams(
113 logGroupName=group_name,
114 logStreamNamePrefix=stream_name,
115 )
116 streams = [
117 dct
118 for dct in res.get("logStreams", [])
119 if dct.get("logStreamName", "unknown-log-stream-name") == stream_name
120 ]
121 if len(streams):
122 return streams[0]
123 else:
124 return None
125
126
127def create_log_stream(
128 logs_client,
129 group_name: str,
130 stream_name: str,
131) -> bool:
132 """
133 Create a log stream, if it already exists, do nothing.
134
135 :return: True if the log stream was created, False if it already existed.
136 """
137 stream = get_log_stream(logs_client, group_name, stream_name)
138 if stream is None:
139 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_stream.html
140 logs_client.create_log_stream(
141 logGroupName=group_name,
142 logStreamName=stream_name,
143 )
144 return True
145 else:
146 return False
147
148
149def delete_log_stream(
150 logs_client,
151 group_name: str,
152 stream_name: str,
153) -> bool:
154 """
155 Delete a log stream, if it doesn't exist, do nothing.
156
157 :return: True if the log stream was deleted, False if it didn't exist.
158 """
159 stream = get_log_stream(logs_client, group_name, stream_name)
160 if stream is None:
161 return False
162 else:
163 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_stream.html
164 logs_client.delete_log_stream(
165 logGroupName=group_name,
166 logStreamName=stream_name,
167 )
168 return True
169
170
171EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
172
173
174def get_now_ts() -> int:
175 """
176 The put log events API expects a timestamp in milliseconds since epoch.
177 """
178 return int(
179 (datetime.utcnow().replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1000
180 )
181
182
183@dataclasses.dataclass
184class Event:
185 """
186 Log event data model.
187 """
188
189 message: str = dataclasses.field()
190 timestamp: int = dataclasses.field(default_factory=get_now_ts)
191
192
193@dataclasses.dataclass
194class BaseJsonMessage:
195 """
196 Base class for json encoded log message.
197 """
198
199 def to_json(self) -> str:
200 """
201 Convert the object to a json string.
202
203 You can override this method to customize the json serialization.
204 """
205 return json.dumps(dataclasses.asdict(self))
206
207 @classmethod
208 def from_json(cls, json_str: str):
209 """
210 You can override this module to customize the json deserialization.
211 """
212 dct = json.loads(json_str)
213 return cls(**dct)
214
215
216def put_log_events(
217 logs_client,
218 group_name: str,
219 stream_name: str,
220 events: T.List[Event],
221) -> T.Optional[dict]:
222 """
223 Put a list of events into a log stream.
224
225 Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
226
227 :param logs_client: The boto3 logs client.
228 :param group_name: The log group name.
229 :param stream_name: The log stream name.
230 :param events: A list of :class:`Event` objects.
231
232 :return: A dict with the response from the put_log_events call.
233 """
234 if len(events) == 0:
235 return None
236 res = logs_client.put_log_events(
237 logGroupName=group_name,
238 logStreamName=stream_name,
239 logEvents=[dataclasses.asdict(event) for event in events],
240 )
241 return res
242
243
244def get_ts(dt: datetime) -> float:
245 """
246 Convert a datetime object to a timestamp in seconds since epoch.
247
248 It assumes the datetime object is in UTC if it doesn't have a timezone.
249 """
250 if dt.tzinfo is None:
251 dt = dt.replace(tzinfo=timezone.utc)
252 else:
253 dt = dt.astimezone(timezone.utc)
254 return (dt - EPOCH).total_seconds()
255
256
257def get_ts_in_second(dt: datetime) -> int:
258 """
259 Convert a datetime object to a timestamp in seconds since epoch.
260 """
261 return int(get_ts(dt))
262
263
264def get_ts_in_millisecond(dt: datetime) -> int:
265 """
266 Convert a datetime object to a timestamp in milliseconds since epoch.
267 """
268 return int(get_ts(dt) * 1000)
269
270
271class QueryStatusEnum(str, enum.Enum):
272 """
273 Enum for the query status.
274
275 Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/get_query_results.html
276 """
277
278 Scheduled = "Scheduled"
279 Running = "Running"
280 Complete = "Complete"
281 Failed = "Failed"
282 Cancelled = "Cancelled"
283 Timeout = "Timeout"
284 Unknown = "Unknown"
285
286
287def wait_logs_insights_query_to_succeed(
288 logs_client,
289 query_id: str,
290 delta: int = 1,
291 timeout: int = 30,
292) -> dict:
293 """
294 Wait a given athena query to reach ``Complete`` status. If failed,
295 raise ``RuntimeError`` immediately. If timeout, raise ``TimeoutError``.
296
297 Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/get_query_results.html
298
299 :param logs_client: The boto3 cloudwatch logs client.
300 :param query_id: The query id from the response of ``start_query`` API call.
301 :param delta: The time interval in seconds between each query status check.
302 :param timeout: The maximum time in seconds to wait for the query to succeed.
303 """
304 elapsed = 0
305 for _ in range(999999):
306 res = logs_client.get_query_results(queryId=query_id)
307 status = res["status"]
308 if status == QueryStatusEnum.Complete.value:
309 return res
310 elif status in [
311 QueryStatusEnum.Failed.value,
312 QueryStatusEnum.Cancelled.value,
313 QueryStatusEnum.Timeout.value,
314 ]:
315 raise RuntimeError(f"query {query_id} reached status: {status}")
316 else:
317 time.sleep(delta)
318 elapsed += delta
319 if elapsed > timeout:
320 raise TimeoutError(f"logs insights query timeout in {timeout} seconds!")
321
322
323def strip_out_limit_clause(query: str) -> str:
324 """
325 Strip out the limit clause from a query string.
326 """
327 lines = query.splitlines()
328 return "\n".join([line for line in lines if not line.startswith("| limit")])
329
330
331def run_query(
332 logs_client,
333 start_datetime: datetime,
334 end_datetime: datetime,
335 query: str,
336 log_group_name: T.Optional[str] = None,
337 log_group_name_list: T.Optional[T.List[str]] = None,
338 log_group_id_list: T.Optional[T.List[str]] = None,
339 limit: int = 1000,
340 delta: int = 1,
341 timeout: int = 30,
342) -> T.Tuple[str, dict]:
343 """
344 Run a logs insights query and wait for the query to succeed. It is a more
345 human friendly wrapper of the ``start_query`` and ``get_query_results`` API.
346
347 Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/start_query.html
348
349 :param logs_client: The boto3 cloudwatch logs client.
350 :param start_datetime: python datetime object for start time,
351 if timezone is not set, it assumes UTC.
352 :param end_datetime: python datetime object for end time,
353 if timezone is not set, it assumes UTC.
354 :param query: The query string. don't use ``| limit abc`` in your query,
355 use the ``limit`` parameter instead.
356 :param log_group_name: see ``start_query`` API.
357 :param log_group_name_list: see ``start_query`` API.
358 :param log_group_id_list: see ``start_query`` API.
359 :param limit: see ``start_query`` API.
360 :param delta: The time interval in seconds between each query status check.
361 :param timeout: The maximum time in seconds to wait for the query to succeed.
362 """
363 start_ts = get_ts_in_second(start_datetime)
364 end_ts = get_ts_in_second(end_datetime)
365 kwargs = dict(
366 startTime=start_ts,
367 endTime=end_ts,
368 queryString=query,
369 limit=limit,
370 )
371 if log_group_name is not None:
372 kwargs["logGroupName"] = log_group_name
373 elif log_group_name_list:
374 kwargs["logGroupNames"] = log_group_name_list
375 elif log_group_id_list:
376 kwargs["logGroupIds"] = log_group_id_list
377 else: # it will raise error in API call
378 pass
379 res = logs_client.start_query(**kwargs)
380 query_id = res["queryId"]
381 res = wait_logs_insights_query_to_succeed(logs_client, query_id, delta, timeout)
382 return query_id, res
383
384
385def reformat_query_results(response: dict) -> T.List[dict]:
386 """
387 Convert the response from ``get_query_results`` API call to a more Pythonic
388 format.
389
390 :param response: the response from ``get_query_results`` API call.
391 """
392 return [
393 {dct["field"]: dct["value"] for dct in result}
394 for result in response.get("results", [])
395 ]
shared.py
该测试所用到的一些常用的变量值.
1# -*- coding: utf-8 -*-
2
3from boto_session_manager import BotoSesManager
4from aws_console_url.api import AWSConsole
5
6bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
7aws = AWSConsole(aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm)
8logs_client = bsm.cloudwatchlogs_client
9
10group_name = "learn_aws_cloudwatch/Analyzing-log-data-with-CloudWatch-Logs-Insights"
11stream_name_1 = "container-1"
12stream_name_2 = "container-2"
data_faker.py
用于创建测试数据的脚本.
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import dataclasses
7
8from recipe import (
9 create_log_group,
10 delete_log_group,
11 create_log_stream,
12 Event,
13 BaseJsonMessage,
14 put_log_events,
15)
16from shared import bsm, logs_client, aws, group_name, stream_name_1, stream_name_2
17
18
19def set_up():
20 """
21 Set up cloudwatch logs resource for this example.
22 """
23 create_log_group(logs_client, group_name)
24 create_log_stream(logs_client, group_name, stream_name_1)
25 create_log_stream(logs_client, group_name, stream_name_2)
26 print(aws.cloudwatch.get_log_group(group_name))
27
28
29@dataclasses.dataclass
30class StatusMessage(BaseJsonMessage):
31 server_id: str = dataclasses.field()
32 status: str = dataclasses.field()
33
34
35@dataclasses.dataclass
36class ProcessingTimeMessage(BaseJsonMessage):
37 server_id: str = dataclasses.field()
38 processing_time: int = dataclasses.field()
39
40
41server_id_list = [stream_name_1, stream_name_2]
42
43
44def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
45 """
46 70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
47 two messages, one for status and one for processing time. When failed, it will
48 generate one failed message for status.
49 """
50 server_id = random.choice(server_id_list)
51 stream_name = server_id
52 if random.randint(1, 100) <= 70:
53 messages = [
54 StatusMessage(
55 server_id=server_id,
56 status="succeeded",
57 ),
58 ProcessingTimeMessage(
59 server_id=server_id,
60 processing_time=random.randint(1000, 10000),
61 ),
62 ]
63 else:
64 messages = [
65 StatusMessage(
66 server_id=server_id,
67 status="failed",
68 )
69 ]
70 put_log_events(
71 bsm.cloudwatchlogs_client,
72 group_name,
73 stream_name,
74 events=[Event(message=message.to_json()) for message in messages],
75 )
76 return messages
77
78
79def run_data_faker():
80 """
81 Run :func:`rand_event` every 1 second.
82
83 Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
84
85 The maximum batch size of a PutLogEvents request is 1MB.
86
87 **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
88 """
89 ith = 0
90 while True:
91 ith += 1
92 print(f"ith: {ith} sec")
93 time.sleep(1)
94 messages = rand_event()
95 for message in messages:
96 print(f" {message}")
97
98
99def clean_up():
100 """
101 Clearn up cloudwatch logs resource for this example.
102 """
103 delete_log_group(logs_client, group_name)
104
105
106if __name__ == "__main__":
107 set_up()
108 run_data_faker()
109 # clean_up()
run_query.py
用于测试 logs insights query 的脚本.
1# -*- coding: utf-8 -*-
2
3from datetime import datetime, timedelta, timezone
4
5from rich import print as rprint
6
7from recipe import run_query, reformat_query_results
8from shared import logs_client, group_name
9
10now = datetime.utcnow().replace(tzinfo=timezone.utc)
11five_minutes_ago = now - timedelta(minutes=5)
12
13query = """
14fields @timestamp, @message, @logStream, @log
15| sort @timestamp desc
16""".strip()
17
18query_id, res = run_query(
19 logs_client,
20 log_group_name=group_name,
21 start_datetime=five_minutes_ago,
22 end_datetime=now,
23 query=query,
24 limit=20,
25)
26print(f"query_id = {query_id}")
27# res = logs_client.get_query_results(queryId="a1b2c3d4") # use this for a known query id
28records = reformat_query_results(res)
29print("records =")
30rprint(records)
31print(f"total records = {len(records)}")