Analyzing log data with CloudWatch Logs Insights#

Keywords: AWS, Amazon, CloudWatch, CW, Log, Logs, Insight, Insights

How do we learn CloudWatch Logs Insights#

最好的学习方法永远是实践. 所以我们准备了一个脚本可以轻松的创建 Log Group / Stream 并且往里面每秒打一个自定义的 Log.

我们还有个函数可以轻松地清除这些资源. 一旦我们 run 了 delete_log_group API, 指定的 Log Group 以及下面的所有 Log Stream 的数据就都会被删除. 我们可以立刻开始一个新的实验.

Query Language Quick Start#

CloudWatch Logs Insights (下面简称 Insights) 的查询语言是一个类 SQL 的语言. 它的语法包含这么几个部分:

  1. 选择, 定义你想要选择哪些字段. 这里包括类似于 SQL 中的 SELECT 的用来选择字段的 fields 关键字, 以及用来创建图表的 stats 关键字.

  2. 筛选, 定义你想要筛选哪些数据. 这里包括类似于 SQL 中的 WHERE 的用来筛选数据的 filter 关键字.

  3. 提取数据, 由于 Log 数据并不像 SQL 中的都是严格结构化的数据, 所以你有时候需要从 Log 中提取结构化的数据. 这里的关键字是 parse.

  4. 和 SQL 一样, Insights 也有 limit 关键字可以限制返回的数据条数, 以及 sort 关键字可以按照指定的字段排序.

当然以上几个只是核心功能, Insights 还有一些针对 Log 的高阶语法. 我们先不急着展开讲, 之后我们会慢慢学到的.

下面这条是一个最基础的例子, 类似于 SQL 中的 SELECT * FROM TABLE LIMIT 10. 其中 @timestamp 是 log 的时间戳, @message 是 log 的内容``. @logStream@log 分别是 Stream 和 Group. 带 @ 的都是 CloudWatch log 中的特殊字段:

fields @timestamp, @message, @logStream, @log
| sort @timestamp desc
| limit 20

如果你的 Log Message 的结构是 {"server_id": "container-1", "processing_time": 500}. 那么你可以直接用 JSON Dot Notation 来选择字段:

fields @timestamp, @message, server_id, processing_time
| sort @timestamp desc
| limit 20

当然在 Filter 中的条件也可以使用 JSON Dot Notation:

fields @timestamp, @message, server_id, processing_time
| filter server_id = "container-1"
| sort @timestamp desc
| limit 20

这里有个小知识点, 如果你要对 timestamp 进行 filter 时, 它不支持 human readable format, 你需要自行将其转化为 millisecond 的 timestamp. 而且你转换的时候一定要注意时区, 否则你的结果可能会出现偏差:

fields @timestamp, @message, server_id, processing_time
| filter @timestamp <= 1699797262424
| sort @timestamp desc
| limit 20

如果你的 filter 的条件有多个, 你可以用 逻辑运算符 and, or, not 来连接它们. 就跟在 SQL 中的 WHERE col1 = value1 and col2 = value2 一样.

Pattern#

Pattern 是一个很强大的函数, 它可以对你的 Log 进行采样, 然后分析出来有哪些 Pattern. 例如我们的测试数据中有两种不同模式的 JSON:

{"server_id": "container-1", "status": "succeeded"}
{"server_id": "container-1", "processing_time": 2000}

那么 pattern 这个函数就可以自动分析出这两种 pattern 的 regex:

fields @timestamp, @message
| pattern @message

Playbook#

这里我们提供了几个 Python 模块, 用于方便地创建 fake data, 以及测试不同的 query.

recipe.py 一些能使得代码更精炼的模块.

  1# -*- coding: utf-8 -*-
  2
  3"""
  4A helper module to work with CloudWatch Logs Group, Stream and put log events.
  5
  6Usage:
  7
  8.. code-block:: python
  9
 10    from recipe import (
 11        get_log_group,
 12        create_log_group,
 13        delete_log_group,
 14        get_log_stream,
 15        create_log_stream,
 16        delete_log_stream,
 17        Event,
 18        BaseJsonMessage,
 19        put_log_events,
 20        get_ts_in_second,
 21        get_ts_in_millisecond,
 22        QueryStatusEnum,
 23        wait_logs_insights_query_to_succeed,
 24        run_query,
 25        reformat_query_results,
 26    )
 27"""
 28
 29import typing as T
 30import time
 31import json
 32import enum
 33import dataclasses
 34from datetime import datetime, timezone
 35
 36
 37def get_log_group(
 38    logs_client,
 39    group_name: str,
 40) -> T.Optional[dict]:
 41    """
 42    Get a log group details by name, if it doesn't exist, return None.
 43
 44    :return: A dict with the log group details, or None if it doesn't exist.
 45    """
 46    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_groups.html
 47    res = logs_client.describe_log_groups(
 48        logGroupNamePrefix=group_name,
 49    )
 50    groups = [
 51        dct
 52        for dct in res.get("logGroups", [])
 53        if dct.get("logGroupName", "unknown-log-group-name") == group_name
 54    ]
 55    if len(groups):
 56        return groups[0]
 57    else:
 58        return None
 59
 60
 61def create_log_group(
 62    logs_client,
 63    group_name: str,
 64) -> bool:
 65    """
 66    Create a log group, if it already exists, do nothing.
 67
 68    :return: True if the log group was created, False if it already existed.
 69    """
 70    group = get_log_group(logs_client, group_name)
 71    if group is None:
 72        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_group.html
 73        logs_client.create_log_group(
 74            logGroupName=group_name,
 75        )
 76        return True
 77    else:
 78        return False
 79
 80
 81def delete_log_group(
 82    logs_client,
 83    group_name: str,
 84) -> bool:
 85    """
 86    Delete a log group, if it doesn't exist, do nothing.
 87
 88    :return: True if the log group was deleted, False if it didn't exist.
 89    """
 90    group = get_log_group(logs_client, group_name)
 91    if group is None:
 92        return False
 93    else:
 94        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_group.html
 95        logs_client.delete_log_group(
 96            logGroupName=group_name,
 97        )
 98        return True
 99
100
101def get_log_stream(
102    logs_client,
103    group_name: str,
104    stream_name: str,
105) -> T.Optional[dict]:
106    """
107    Get a log stream details by name, if it doesn't exist, return None.
108
109    :return: A dict with the log stream details, or None if it doesn't exist.
110    """
111    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_streams.html
112    res = logs_client.describe_log_streams(
113        logGroupName=group_name,
114        logStreamNamePrefix=stream_name,
115    )
116    streams = [
117        dct
118        for dct in res.get("logStreams", [])
119        if dct.get("logStreamName", "unknown-log-stream-name") == stream_name
120    ]
121    if len(streams):
122        return streams[0]
123    else:
124        return None
125
126
127def create_log_stream(
128    logs_client,
129    group_name: str,
130    stream_name: str,
131) -> bool:
132    """
133    Create a log stream, if it already exists, do nothing.
134
135    :return: True if the log stream was created, False if it already existed.
136    """
137    stream = get_log_stream(logs_client, group_name, stream_name)
138    if stream is None:
139        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_stream.html
140        logs_client.create_log_stream(
141            logGroupName=group_name,
142            logStreamName=stream_name,
143        )
144        return True
145    else:
146        return False
147
148
149def delete_log_stream(
150    logs_client,
151    group_name: str,
152    stream_name: str,
153) -> bool:
154    """
155    Delete a log stream, if it doesn't exist, do nothing.
156
157    :return: True if the log stream was deleted, False if it didn't exist.
158    """
159    stream = get_log_stream(logs_client, group_name, stream_name)
160    if stream is None:
161        return False
162    else:
163        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_stream.html
164        logs_client.delete_log_stream(
165            logGroupName=group_name,
166            logStreamName=stream_name,
167        )
168        return True
169
170
171EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
172
173
174def get_now_ts() -> int:
175    """
176    The put log events API expects a timestamp in milliseconds since epoch.
177    """
178    return int(
179        (datetime.utcnow().replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1000
180    )
181
182
183@dataclasses.dataclass
184class Event:
185    """
186    Log event data model.
187    """
188
189    message: str = dataclasses.field()
190    timestamp: int = dataclasses.field(default_factory=get_now_ts)
191
192
193@dataclasses.dataclass
194class BaseJsonMessage:
195    """
196    Base class for json encoded log message.
197    """
198
199    def to_json(self) -> str:
200        """
201        Convert the object to a json string.
202
203        You can override this method to customize the json serialization.
204        """
205        return json.dumps(dataclasses.asdict(self))
206
207    @classmethod
208    def from_json(cls, json_str: str):
209        """
210        You can override this module to customize the json deserialization.
211        """
212        dct = json.loads(json_str)
213        return cls(**dct)
214
215
216def put_log_events(
217    logs_client,
218    group_name: str,
219    stream_name: str,
220    events: T.List[Event],
221) -> T.Optional[dict]:
222    """
223    Put a list of events into a log stream.
224
225    Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
226
227    :param logs_client: The boto3 logs client.
228    :param group_name: The log group name.
229    :param stream_name: The log stream name.
230    :param events: A list of :class:`Event` objects.
231
232    :return: A dict with the response from the put_log_events call.
233    """
234    if len(events) == 0:
235        return None
236    res = logs_client.put_log_events(
237        logGroupName=group_name,
238        logStreamName=stream_name,
239        logEvents=[dataclasses.asdict(event) for event in events],
240    )
241    return res
242
243
244def get_ts(dt: datetime) -> float:
245    """
246    Convert a datetime object to a timestamp in seconds since epoch.
247
248    It assumes the datetime object is in UTC if it doesn't have a timezone.
249    """
250    if dt.tzinfo is None:
251        dt = dt.replace(tzinfo=timezone.utc)
252    else:
253        dt = dt.astimezone(timezone.utc)
254    return (dt - EPOCH).total_seconds()
255
256
257def get_ts_in_second(dt: datetime) -> int:
258    """
259    Convert a datetime object to a timestamp in seconds since epoch.
260    """
261    return int(get_ts(dt))
262
263
264def get_ts_in_millisecond(dt: datetime) -> int:
265    """
266    Convert a datetime object to a timestamp in milliseconds since epoch.
267    """
268    return int(get_ts(dt) * 1000)
269
270
271class QueryStatusEnum(str, enum.Enum):
272    """
273    Enum for the query status.
274
275    Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/get_query_results.html
276    """
277
278    Scheduled = "Scheduled"
279    Running = "Running"
280    Complete = "Complete"
281    Failed = "Failed"
282    Cancelled = "Cancelled"
283    Timeout = "Timeout"
284    Unknown = "Unknown"
285
286
287def wait_logs_insights_query_to_succeed(
288    logs_client,
289    query_id: str,
290    delta: int = 1,
291    timeout: int = 30,
292) -> dict:
293    """
294    Wait a given athena query to reach ``Complete`` status. If failed,
295    raise ``RuntimeError`` immediately. If timeout, raise ``TimeoutError``.
296
297    Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/get_query_results.html
298
299    :param logs_client: The boto3 cloudwatch logs client.
300    :param query_id: The query id from the response of ``start_query`` API call.
301    :param delta: The time interval in seconds between each query status check.
302    :param timeout: The maximum time in seconds to wait for the query to succeed.
303    """
304    elapsed = 0
305    for _ in range(999999):
306        res = logs_client.get_query_results(queryId=query_id)
307        status = res["status"]
308        if status == QueryStatusEnum.Complete.value:
309            return res
310        elif status in [
311            QueryStatusEnum.Failed.value,
312            QueryStatusEnum.Cancelled.value,
313            QueryStatusEnum.Timeout.value,
314        ]:
315            raise RuntimeError(f"query {query_id} reached status: {status}")
316        else:
317            time.sleep(delta)
318        elapsed += delta
319        if elapsed > timeout:
320            raise TimeoutError(f"logs insights query timeout in {timeout} seconds!")
321
322
323def strip_out_limit_clause(query: str) -> str:
324    """
325    Strip out the limit clause from a query string.
326    """
327    lines = query.splitlines()
328    return "\n".join([line for line in lines if not line.startswith("| limit")])
329
330
331def run_query(
332    logs_client,
333    start_datetime: datetime,
334    end_datetime: datetime,
335    query: str,
336    log_group_name: T.Optional[str] = None,
337    log_group_name_list: T.Optional[T.List[str]] = None,
338    log_group_id_list: T.Optional[T.List[str]] = None,
339    limit: int = 1000,
340    delta: int = 1,
341    timeout: int = 30,
342) -> T.Tuple[str, dict]:
343    """
344    Run a logs insights query and wait for the query to succeed. It is a more
345    human friendly wrapper of the ``start_query`` and ``get_query_results`` API.
346
347    Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/start_query.html
348
349    :param logs_client: The boto3 cloudwatch logs client.
350    :param start_datetime: python datetime object for start time,
351        if timezone is not set, it assumes UTC.
352    :param end_datetime: python datetime object for end time,
353        if timezone is not set, it assumes UTC.
354    :param query: The query string. don't use ``| limit abc`` in your query,
355        use the ``limit`` parameter instead.
356    :param log_group_name: see ``start_query`` API.
357    :param log_group_name_list: see ``start_query`` API.
358    :param log_group_id_list: see ``start_query`` API.
359    :param limit: see ``start_query`` API.
360    :param delta: The time interval in seconds between each query status check.
361    :param timeout: The maximum time in seconds to wait for the query to succeed.
362    """
363    start_ts = get_ts_in_second(start_datetime)
364    end_ts = get_ts_in_second(end_datetime)
365    kwargs = dict(
366        startTime=start_ts,
367        endTime=end_ts,
368        queryString=query,
369        limit=limit,
370    )
371    if log_group_name is not None:
372        kwargs["logGroupName"] = log_group_name
373    elif log_group_name_list:
374        kwargs["logGroupNames"] = log_group_name_list
375    elif log_group_id_list:
376        kwargs["logGroupIds"] = log_group_id_list
377    else:  # it will raise error in API call
378        pass
379    res = logs_client.start_query(**kwargs)
380    query_id = res["queryId"]
381    res = wait_logs_insights_query_to_succeed(logs_client, query_id, delta, timeout)
382    return query_id, res
383
384
385def reformat_query_results(response: dict) -> T.List[dict]:
386    """
387    Convert the response from ``get_query_results`` API call to a more Pythonic
388    format.
389
390    :param response: the response from ``get_query_results`` API call.
391    """
392    return [
393        {dct["field"]: dct["value"] for dct in result}
394        for result in response.get("results", [])
395    ]

shared.py 该测试所用到的一些常用的变量值.

 1# -*- coding: utf-8 -*-
 2
 3from boto_session_manager import BotoSesManager
 4from aws_console_url.api import AWSConsole
 5
 6bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
 7aws = AWSConsole(aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm)
 8logs_client = bsm.cloudwatchlogs_client
 9
10group_name = "learn_aws_cloudwatch/Analyzing-log-data-with-CloudWatch-Logs-Insights"
11stream_name_1 = "container-1"
12stream_name_2 = "container-2"

data_faker.py 用于创建测试数据的脚本.

  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import time
  5import random
  6import dataclasses
  7
  8from recipe import (
  9    create_log_group,
 10    delete_log_group,
 11    create_log_stream,
 12    Event,
 13    BaseJsonMessage,
 14    put_log_events,
 15)
 16from shared import bsm, logs_client, aws, group_name, stream_name_1, stream_name_2
 17
 18
 19def set_up():
 20    """
 21    Set up cloudwatch logs resource for this example.
 22    """
 23    create_log_group(logs_client, group_name)
 24    create_log_stream(logs_client, group_name, stream_name_1)
 25    create_log_stream(logs_client, group_name, stream_name_2)
 26    print(aws.cloudwatch.get_log_group(group_name))
 27
 28
 29@dataclasses.dataclass
 30class StatusMessage(BaseJsonMessage):
 31    server_id: str = dataclasses.field()
 32    status: str = dataclasses.field()
 33
 34
 35@dataclasses.dataclass
 36class ProcessingTimeMessage(BaseJsonMessage):
 37    server_id: str = dataclasses.field()
 38    processing_time: int = dataclasses.field()
 39
 40
 41server_id_list = [stream_name_1, stream_name_2]
 42
 43
 44def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
 45    """
 46    70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
 47    two messages, one for status and one for processing time. When failed, it will
 48    generate one failed message for status.
 49    """
 50    server_id = random.choice(server_id_list)
 51    stream_name = server_id
 52    if random.randint(1, 100) <= 70:
 53        messages = [
 54            StatusMessage(
 55                server_id=server_id,
 56                status="succeeded",
 57            ),
 58            ProcessingTimeMessage(
 59                server_id=server_id,
 60                processing_time=random.randint(1000, 10000),
 61            ),
 62        ]
 63    else:
 64        messages = [
 65            StatusMessage(
 66                server_id=server_id,
 67                status="failed",
 68            )
 69        ]
 70    put_log_events(
 71        bsm.cloudwatchlogs_client,
 72        group_name,
 73        stream_name,
 74        events=[Event(message=message.to_json()) for message in messages],
 75    )
 76    return messages
 77
 78
 79def run_data_faker():
 80    """
 81    Run :func:`rand_event` every 1 second.
 82
 83    Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
 84
 85    The maximum batch size of a PutLogEvents request is 1MB.
 86
 87    **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
 88    """
 89    ith = 0
 90    while True:
 91        ith += 1
 92        print(f"ith: {ith} sec")
 93        time.sleep(1)
 94        messages = rand_event()
 95        for message in messages:
 96            print(f"  {message}")
 97
 98
 99def clean_up():
100    """
101    Clearn up cloudwatch logs resource for this example.
102    """
103    delete_log_group(logs_client, group_name)
104
105
106if __name__ == "__main__":
107    set_up()
108    run_data_faker()
109    # clean_up()

run_query.py 用于测试 logs insights query 的脚本.

 1# -*- coding: utf-8 -*-
 2
 3from datetime import datetime, timedelta, timezone
 4
 5from rich import print as rprint
 6
 7from recipe import run_query, reformat_query_results
 8from shared import logs_client, group_name
 9
10now = datetime.utcnow().replace(tzinfo=timezone.utc)
11five_minutes_ago = now - timedelta(minutes=5)
12
13query = """
14fields @timestamp, @message, @logStream, @log
15| sort @timestamp desc
16""".strip()
17
18query_id, res = run_query(
19    logs_client,
20    log_group_name=group_name,
21    start_datetime=five_minutes_ago,
22    end_datetime=now,
23    query=query,
24    limit=20,
25)
26print(f"query_id = {query_id}")
27# res = logs_client.get_query_results(queryId="a1b2c3d4") # use this for a known query id
28records = reformat_query_results(res)
29print("records =")
30rprint(records)
31print(f"total records = {len(records)}")