Working with log groups and log streams#

Keywords: AWS, Amazon, CloudWatch, CW, Log Group Stream

Create and Update Log Group and Stream#

对 Log group 的管理非常简单, 它只有 create_log_group 和配置 Retention policy put_retention_policy 两个操作.

对 Log stream 的管理更简单了, 压根没有其他参数.

下面是一个帮助你管理 Group 和 Stream 的模块. 比原生 API 的提升是加入了处理在 Create 时 Group 已经存在, 或是 Delete 的时 Group 不存在的情况. 并且提供了一个对 Log event 的数据模型抽象.

  1# -*- coding: utf-8 -*-
  2
  3"""
  4A helper module to work with CloudWatch Logs Group, Stream and put log events.
  5
  6Usage:
  7
  8.. code-block:: python
  9
 10    from recipe import (
 11        get_log_group,
 12        create_log_group,
 13        delete_log_group,
 14        get_log_stream,
 15        create_log_stream,
 16        delete_log_stream,
 17        Event,
 18        BaseJsonMessage,
 19        put_log_events,
 20    )
 21"""
 22
 23import typing as T
 24import json
 25import dataclasses
 26from datetime import datetime, timezone
 27
 28
 29def get_log_group(
 30    logs_client,
 31    group_name: str,
 32) -> T.Optional[dict]:
 33    """
 34    Get a log group details by name, if it doesn't exist, return None.
 35
 36    :return: A dict with the log group details, or None if it doesn't exist.
 37    """
 38    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_groups.html
 39    res = logs_client.describe_log_groups(
 40        logGroupNamePrefix=group_name,
 41    )
 42    groups = [
 43        dct
 44        for dct in res.get("logGroups", [])
 45        if dct.get("logGroupName", "unknown-log-group-name") == group_name
 46    ]
 47    if len(groups):
 48        return groups[0]
 49    else:
 50        return None
 51
 52
 53def create_log_group(
 54    logs_client,
 55    group_name: str,
 56) -> bool:
 57    """
 58    Create a log group, if it already exists, do nothing.
 59
 60    :return: True if the log group was created, False if it already existed.
 61    """
 62    group = get_log_group(logs_client, group_name)
 63    if group is None:
 64        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_group.html
 65        logs_client.create_log_group(
 66            logGroupName=group_name,
 67        )
 68        return True
 69    else:
 70        return False
 71
 72
 73def delete_log_group(
 74    logs_client,
 75    group_name: str,
 76) -> bool:
 77    """
 78    Delete a log group, if it doesn't exist, do nothing.
 79
 80    :return: True if the log group was deleted, False if it didn't exist.
 81    """
 82    group = get_log_group(logs_client, group_name)
 83    if group is None:
 84        return False
 85    else:
 86        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_group.html
 87        logs_client.delete_log_group(
 88            logGroupName=group_name,
 89        )
 90        return True
 91
 92
 93def get_log_stream(
 94    logs_client,
 95    group_name: str,
 96    stream_name: str,
 97) -> T.Optional[dict]:
 98    """
 99    Get a log stream details by name, if it doesn't exist, return None.
100
101    :return: A dict with the log stream details, or None if it doesn't exist.
102    """
103    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/describe_log_streams.html
104    res = logs_client.describe_log_streams(
105        logGroupName=group_name,
106        logStreamNamePrefix=stream_name,
107    )
108    streams = [
109        dct
110        for dct in res.get("logStreams", [])
111        if dct.get("logStreamName", "unknown-log-stream-name") == stream_name
112    ]
113    if len(streams):
114        return streams[0]
115    else:
116        return None
117
118
119def create_log_stream(
120    logs_client,
121    group_name: str,
122    stream_name: str,
123) -> bool:
124    """
125    Create a log stream, if it already exists, do nothing.
126
127    :return: True if the log stream was created, False if it already existed.
128    """
129    stream = get_log_stream(logs_client, group_name, stream_name)
130    if stream is None:
131        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/create_log_stream.html
132        logs_client.create_log_stream(
133            logGroupName=group_name,
134            logStreamName=stream_name,
135        )
136        return True
137    else:
138        return False
139
140
141def delete_log_stream(
142    logs_client,
143    group_name: str,
144    stream_name: str,
145) -> bool:
146    """
147    Delete a log stream, if it doesn't exist, do nothing.
148
149    :return: True if the log stream was deleted, False if it didn't exist.
150    """
151    stream = get_log_stream(logs_client, group_name, stream_name)
152    if stream is None:
153        return False
154    else:
155        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/delete_log_stream.html
156        logs_client.delete_log_stream(
157            logGroupName=group_name,
158            logStreamName=stream_name,
159        )
160        return True
161
162
163EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
164
165
166def get_now_ts() -> int:
167    """
168    The put log events API expects a timestamp in milliseconds since epoch.
169    """
170    return int((datetime.utcnow().replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1000)
171
172
173@dataclasses.dataclass
174class Event:
175    """
176    Log event data model.
177    """
178
179    message: str = dataclasses.field()
180    timestamp: int = dataclasses.field(default_factory=get_now_ts)
181
182
183@dataclasses.dataclass
184class BaseJsonMessage:
185    """
186    Base class for json encoded log message.
187    """
188
189    def to_json(self) -> str:
190        """
191        Convert the object to a json string.
192
193        You can override this method to customize the json serialization.
194        """
195        return json.dumps(dataclasses.asdict(self))
196
197    @classmethod
198    def from_json(cls, json_str: str):
199        """
200        You can override this module to customize the json deserialization.
201        """
202        dct = json.loads(json_str)
203        return cls(**dct)
204
205
206def put_log_events(
207    logs_client,
208    group_name: str,
209    stream_name: str,
210    events: T.List[Event],
211) -> T.Optional[dict]:
212    """
213    Put a list of events into a log stream.
214
215    :param logs_client: The boto3 logs client.
216    :param group_name: The log group name.
217    :param stream_name: The log stream name.
218    :param events: A list of :class:`Event` objects.
219
220    :return: A dict with the response from the put_log_events call.
221    """
222    if len(events) == 0:
223        return None
224    res = logs_client.put_log_events(
225        logGroupName=group_name,
226        logStreamName=stream_name,
227        logEvents=[dataclasses.asdict(event) for event in events],
228    )
229    return res

Put Log Events#

我们可以用 put_log_events 的 API 来将 Log 打到 CloudWatch. 默认情况下这个 API 有两个限制:

  1. 每个 batch (一次 API) 的数据量不得超过 1MB.

  2. 对于 US East (N. Virginia), US West (Oregon), and Europe (Ireland), 一个 account / region 每秒最多 1500 Transaction, 对于其他 region 则是每秒 800 次.

下面给出了一段利用前面的模块写的往 Log Stream 里打 Log 的代码.

  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import time
  5import random
  6import dataclasses
  7
  8from boto_session_manager import BotoSesManager
  9from aws_console_url.api import AWSConsole
 10
 11from recipe import (
 12    create_log_group,
 13    delete_log_group,
 14    create_log_stream,
 15    Event,
 16    BaseJsonMessage,
 17    put_log_events,
 18)
 19
 20bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
 21aws = AWSConsole(aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm)
 22log_client = bsm.cloudwatchlogs_client
 23
 24group_name = "learn_aws_cloudwatch/working_with_log_groups_and_log_streams"
 25stream_name_1 = "container-1"
 26stream_name_2 = "container-2"
 27
 28
 29def set_up():
 30    """
 31    Set up cloudwatch logs resource for this example.
 32    """
 33    create_log_group(log_client, group_name)
 34    create_log_stream(log_client, group_name, stream_name_1)
 35    create_log_stream(log_client, group_name, stream_name_2)
 36    print(aws.cloudwatch.get_log_group(group_name))
 37
 38
 39@dataclasses.dataclass
 40class StatusMessage(BaseJsonMessage):
 41    server_id: str = dataclasses.field()
 42    status: str = dataclasses.field()
 43
 44
 45@dataclasses.dataclass
 46class ProcessingTimeMessage(BaseJsonMessage):
 47    server_id: str = dataclasses.field()
 48    processing_time: int = dataclasses.field()
 49
 50
 51server_id_list = [stream_name_1, stream_name_2]
 52
 53
 54def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
 55    """
 56    70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
 57    two messages, one for status and one for processing time. When failed, it will
 58    generate one failed message for status.
 59    """
 60    server_id = random.choice(server_id_list)
 61    stream_name = server_id
 62    if random.randint(1, 100) <= 70:
 63        messages = [
 64            StatusMessage(
 65                server_id=server_id,
 66                status="succeeded",
 67            ),
 68            ProcessingTimeMessage(
 69                server_id=server_id,
 70                processing_time=random.randint(100, 1000),
 71            ),
 72        ]
 73    else:
 74        messages = [
 75            StatusMessage(
 76                server_id=server_id,
 77                status="failed",
 78            )
 79        ]
 80    put_log_events(
 81        bsm.cloudwatchlogs_client,
 82        group_name,
 83        stream_name,
 84        events=[Event(message=message.to_json()) for message in messages],
 85    )
 86    return messages
 87
 88
 89def run_data_faker():
 90    """
 91    Run :func:`rand_event` every 1 second.
 92
 93    Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
 94
 95    The maximum batch size of a PutLogEvents request is 1MB.
 96
 97    **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
 98    """
 99    ith = 0
100    while True:
101        ith += 1
102        print(f"ith: {ith} sec")
103        time.sleep(1)
104        messages = rand_event()
105        for message in messages:
106            print(f"  {message}")
107
108
109def clean_up():
110    """
111    Clearn up cloudwatch logs resource for this example.
112    """
113    delete_log_group(log_client, group_name)
114
115
116if __name__ == "__main__":
117    set_up()
118    run_data_faker()
119    # clean_up()