Use Amazon CloudWatch metrics#

Keywords: AWS, Amazon, CloudWatch, CW, Metric, Metrics

What is Metrics#

Metrics 本质上是对某个特定的指标的 measurement 的时间序列. 你可以理解为对 Logs 时间序列数据计算后的结果. 它本质上也是一个离散的时间序列. 通常一个 Metrics 是一个单一指标. 而 metrics 的 metadata 被称为 dimension (非常重要). 例如 EC2 的 CPU usage 是 metrics, 而 EC2 instance id 就是 dimension. 这是时间序列数据建模的关键技术之一.

Reference:

Dimension#

Metrics Insights Query#

Metrics Insights Query 是一个对 Metrics 时间序列用类 SQL 语言来查询的工具. 他跟 CloudWatch Logs Insights 是完全不同的两个东西, 请不要将其混淆.

它的主要功能是, SELECT namespace/metrics, 限定在一定的时间区间内, 用 dimension 对 metrics 进行过滤, 然后用数学函数对其进行计算或者按照 time interval 进行聚合.

下面我们来看一个例子. 我们的日志数据是记录两个服务器 server 1, 2 上的响应时间. 下面这个脚本可以生成日志数据.

s1_data_faker.py

  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import time
  5import random
  6import dataclasses
  7
  8from recipe import (
  9    create_log_group,
 10    delete_log_group,
 11    create_log_stream,
 12    Event,
 13    BaseJsonMessage,
 14    put_log_events,
 15)
 16from config import bsm, logs_client, aws, group_name, stream_name_1, stream_name_2
 17
 18
 19def set_up():
 20    """
 21    Set up cloudwatch logs resource for this example.
 22    """
 23    create_log_group(logs_client, group_name)
 24    create_log_stream(logs_client, group_name, stream_name_1)
 25    create_log_stream(logs_client, group_name, stream_name_2)
 26    print(aws.cloudwatch.get_log_group(group_name))
 27
 28
 29@dataclasses.dataclass
 30class StatusMessage(BaseJsonMessage):
 31    server_id: str = dataclasses.field()
 32    status: str = dataclasses.field()
 33
 34
 35@dataclasses.dataclass
 36class ProcessingTimeMessage(BaseJsonMessage):
 37    server_id: str = dataclasses.field()
 38    processing_time: int = dataclasses.field()
 39
 40
 41server_id_list = [stream_name_1, stream_name_2]
 42
 43
 44def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
 45    """
 46    70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
 47    two messages, one for status and one for processing time. When failed, it will
 48    generate one failed message for status.
 49    """
 50    server_id = random.choice(server_id_list)
 51    stream_name = server_id
 52    if random.randint(1, 100) <= 70:
 53        messages = [
 54            StatusMessage(
 55                server_id=server_id,
 56                status="succeeded",
 57            ),
 58            ProcessingTimeMessage(
 59                server_id=server_id,
 60                processing_time=random.randint(1000, 10000),
 61            ),
 62        ]
 63    else:
 64        messages = [
 65            StatusMessage(
 66                server_id=server_id,
 67                status="failed",
 68            )
 69        ]
 70    put_log_events(
 71        bsm.cloudwatchlogs_client,
 72        group_name,
 73        stream_name,
 74        events=[Event(message=message.to_json()) for message in messages],
 75    )
 76    return messages
 77
 78
 79def run_data_faker():
 80    """
 81    Run :func:`rand_event` every 1 second.
 82
 83    Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
 84
 85    The maximum batch size of a PutLogEvents request is 1MB.
 86
 87    **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
 88    """
 89    ith = 0
 90    while True:
 91        ith += 1
 92        print(f"ith: {ith} sec")
 93        time.sleep(1)
 94        messages = rand_event()
 95        for message in messages:
 96            print(f"  {message}")
 97
 98
 99def clean_up():
100    """
101    Clearn up cloudwatch logs resource for this example.
102    """
103    delete_log_group(logs_client, group_name)
104
105
106if __name__ == "__main__":
107    set_up()
108    run_data_faker()
109    # clean_up()

下面这个脚本使用了 metrics filter 来对日志进行过滤, 并生成了一个 metrics. 这个 metrics 是带 dimension 的, dimension 数据是从 server_id 中提取出来的.

s2_create_metrics.py

 1# -*- coding: utf-8 -*-
 2
 3"""
 4This script creates a custom metrics based on the log event data.
 5"""
 6
 7from config import logs_client, group_name, metric_namespace, metric_name
 8
 9# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_metric_filter.html
10logs_client.put_metric_filter(
11    logGroupName=group_name,
12    filterName="ProcessingTime",
13    filterPattern='{ $.processing_time = "*" }',
14    metricTransformations=[
15        {
16            "metricNamespace": metric_namespace,
17            "metricName": metric_name,
18            "metricValue": "$.processing_time",
19            "dimensions": {
20                "server_id": "$.server_id",
21            },
22        },
23    ],
24)

下面这个脚本使用了 metrics insights query 来进行分析. 它有两种 API 风格. 一种是用结构化的 JSON 来 build 这个 query. 还有一种是用 SQL 语法来描述这个 query. SQL 语法更简洁好学, 但不适合参数化 (小心 SQL 注入). 下面这个例子两种方法都展示了.

s3_metrics_insight.py

  1# -*- coding: utf-8 -*-
  2
  3"""
  4Learn how to use get_metric_data API to query metrics
  5
  6Ref:
  7
  8- Metrics Insights query components and syntax: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-metrics-insights-querylanguage.html
  9- cloudwatch_client.get_metric_data: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/get_metric_data.html
 10"""
 11
 12from textwrap import dedent
 13from datetime import datetime, timezone, timedelta
 14from config import (
 15    cw_client,
 16    metric_namespace,
 17    metric_name,
 18    stream_name_1,
 19    stream_name_2,
 20)
 21
 22
 23now = datetime.utcnow().replace(tzinfo=timezone.utc)
 24five_minutes_ago = now - timedelta(minutes=5)
 25
 26
 27def print_get_metric_data_response(res: dict):
 28    dct = {
 29        "Timestamps": res["MetricDataResults"][0]["Timestamps"],
 30        "Values": res["MetricDataResults"][0]["Values"],
 31    }
 32    print(dct)
 33
 34
 35def _use_metric_stat(dimensions: list) -> dict:
 36    return cw_client.get_metric_data(
 37        MetricDataQueries=[
 38            dict(
 39                Id="id1",
 40                MetricStat=dict(
 41                    Metric=dict(
 42                        Namespace=metric_namespace,
 43                        MetricName=metric_name,
 44                        Dimensions=dimensions,
 45                    ),
 46                    Period=60,
 47                    Stat="Average",
 48                ),
 49                ReturnData=True,
 50            ),
 51        ],
 52        StartTime=five_minutes_ago,
 53        EndTime=now,
 54        ScanBy="TimestampAscending",
 55    )
 56
 57
 58def use_metric_stat():
 59    """
 60    Here's a tricky part, you cannot use multiple dimensions with same name,
 61    only the last one will be used.
 62    It is NOT logic OR, and metrics insight doesn't support logic OR.
 63    """
 64    dimensions = [
 65        dict(Name="server_id", Value=stream_name_1),
 66        dict(Name="server_id", Value=stream_name_2),
 67    ]
 68    res = _use_metric_stat(dimensions=dimensions)
 69    print(dimensions)
 70    print_get_metric_data_response(res)
 71
 72    dimensions = [
 73        # dict(Name="server_id", Value=stream_name_1),
 74        dict(Name="server_id", Value=stream_name_2),
 75    ]
 76    res = _use_metric_stat(dimensions=dimensions)
 77    print(dimensions)
 78    print_get_metric_data_response(res)
 79
 80    dimensions = [
 81        dict(Name="server_id", Value=stream_name_2),
 82        dict(Name="server_id", Value=stream_name_1),
 83    ]
 84    res = _use_metric_stat(dimensions=dimensions)
 85    print(dimensions)
 86    print_get_metric_data_response(res)
 87
 88    dimensions = [
 89        dict(Name="server_id", Value=stream_name_1),
 90        # dict(Name="server_id", Value=stream_name_2),
 91    ]
 92    res = _use_metric_stat(dimensions=dimensions)
 93    print(dimensions)
 94    print_get_metric_data_response(res)
 95
 96    res = _use_metric_stat(
 97        dimensions=[
 98            # dict(Name="server_id", Value=stream_name_1),
 99            # dict(Name="server_id", Value=stream_name_2),
100        ]
101    )
102    print_get_metric_data_response(res)
103
104
105def _use_expression(sql: str) -> dict:
106    return cw_client.get_metric_data(
107        MetricDataQueries=[
108            dict(
109                Id="id1",
110                Expression=dedent(sql.strip()),
111                Period=60,
112                ReturnData=True,
113            ),
114        ],
115        StartTime=five_minutes_ago,
116        EndTime=now,
117        ScanBy="TimestampAscending",
118    )
119
120
121def use_expression():
122    """
123    Metric insights doesn't support logic or, so we should use != instead.
124
125    List of queries:
126
127    SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_1}'
128    SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_2}'
129    SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id != 'xyz'
130    SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace})
131    """
132    sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_1}'"
133    res = _use_expression(sql)
134    print(sql)
135    print_get_metric_data_response(res)
136
137    sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_2}'"
138    res = _use_expression(sql)
139    print(sql)
140    print_get_metric_data_response(res)
141
142    sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id != 'xyz'"
143    res = _use_expression(sql)
144    print(sql)
145    print_get_metric_data_response(res)
146
147    sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace})"
148    res = _use_expression(sql)
149    print(sql)
150    print_get_metric_data_response(res)
151
152
153if __name__ == "__main__":
154    use_metric_stat()
155    use_expression()

Reference: