Use Amazon CloudWatch metrics#
Keywords: AWS, Amazon, CloudWatch, CW, Metric, Metrics
What is Metrics#
Metrics 本质上是对某个特定的指标的 measurement 的时间序列. 你可以理解为对 Logs 时间序列数据计算后的结果. 它本质上也是一个离散的时间序列. 通常一个 Metrics 是一个单一指标. 而 metrics 的 metadata 被称为 dimension (非常重要). 例如 EC2 的 CPU usage 是 metrics, 而 EC2 instance id 就是 dimension. 这是时间序列数据建模的关键技术之一.
Reference:
Dimension#
Metrics Insights Query#
Metrics Insights Query 是一个对 Metrics 时间序列用类 SQL 语言来查询的工具. 他跟 CloudWatch Logs Insights 是完全不同的两个东西, 请不要将其混淆.
它的主要功能是, SELECT namespace/metrics, 限定在一定的时间区间内, 用 dimension 对 metrics 进行过滤, 然后用数学函数对其进行计算或者按照 time interval 进行聚合.
下面我们来看一个例子. 我们的日志数据是记录两个服务器 server 1, 2 上的响应时间. 下面这个脚本可以生成日志数据.
s1_data_faker.py
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import dataclasses
7
8from recipe import (
9 create_log_group,
10 delete_log_group,
11 create_log_stream,
12 Event,
13 BaseJsonMessage,
14 put_log_events,
15)
16from config import bsm, logs_client, aws, group_name, stream_name_1, stream_name_2
17
18
19def set_up():
20 """
21 Set up cloudwatch logs resource for this example.
22 """
23 create_log_group(logs_client, group_name)
24 create_log_stream(logs_client, group_name, stream_name_1)
25 create_log_stream(logs_client, group_name, stream_name_2)
26 print(aws.cloudwatch.get_log_group(group_name))
27
28
29@dataclasses.dataclass
30class StatusMessage(BaseJsonMessage):
31 server_id: str = dataclasses.field()
32 status: str = dataclasses.field()
33
34
35@dataclasses.dataclass
36class ProcessingTimeMessage(BaseJsonMessage):
37 server_id: str = dataclasses.field()
38 processing_time: int = dataclasses.field()
39
40
41server_id_list = [stream_name_1, stream_name_2]
42
43
44def rand_event() -> T.List[T.Union[ProcessingTimeMessage, StatusMessage]]:
45 """
46 70% chance it succeeds, 30% chance it fails. When succeeded, it will generate
47 two messages, one for status and one for processing time. When failed, it will
48 generate one failed message for status.
49 """
50 server_id = random.choice(server_id_list)
51 stream_name = server_id
52 if random.randint(1, 100) <= 70:
53 messages = [
54 StatusMessage(
55 server_id=server_id,
56 status="succeeded",
57 ),
58 ProcessingTimeMessage(
59 server_id=server_id,
60 processing_time=random.randint(1000, 10000),
61 ),
62 ]
63 else:
64 messages = [
65 StatusMessage(
66 server_id=server_id,
67 status="failed",
68 )
69 ]
70 put_log_events(
71 bsm.cloudwatchlogs_client,
72 group_name,
73 stream_name,
74 events=[Event(message=message.to_json()) for message in messages],
75 )
76 return messages
77
78
79def run_data_faker():
80 """
81 Run :func:`rand_event` every 1 second.
82
83 Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
84
85 The maximum batch size of a PutLogEvents request is 1MB.
86
87 **800** transactions per second per account per Region, except for the following Regions where the quota is 1500 transactions per second per account per Region: US East (N. Virginia), US West (Oregon), and Europe (Ireland). You can request an increase to the per-second throttling quota by using the Service Quotas service.
88 """
89 ith = 0
90 while True:
91 ith += 1
92 print(f"ith: {ith} sec")
93 time.sleep(1)
94 messages = rand_event()
95 for message in messages:
96 print(f" {message}")
97
98
99def clean_up():
100 """
101 Clearn up cloudwatch logs resource for this example.
102 """
103 delete_log_group(logs_client, group_name)
104
105
106if __name__ == "__main__":
107 set_up()
108 run_data_faker()
109 # clean_up()
下面这个脚本使用了 metrics filter 来对日志进行过滤, 并生成了一个 metrics. 这个 metrics 是带 dimension 的, dimension 数据是从 server_id 中提取出来的.
s2_create_metrics.py
1# -*- coding: utf-8 -*-
2
3"""
4This script creates a custom metrics based on the log event data.
5"""
6
7from config import logs_client, group_name, metric_namespace, metric_name
8
9# ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_metric_filter.html
10logs_client.put_metric_filter(
11 logGroupName=group_name,
12 filterName="ProcessingTime",
13 filterPattern='{ $.processing_time = "*" }',
14 metricTransformations=[
15 {
16 "metricNamespace": metric_namespace,
17 "metricName": metric_name,
18 "metricValue": "$.processing_time",
19 "dimensions": {
20 "server_id": "$.server_id",
21 },
22 },
23 ],
24)
下面这个脚本使用了 metrics insights query 来进行分析. 它有两种 API 风格. 一种是用结构化的 JSON 来 build 这个 query. 还有一种是用 SQL 语法来描述这个 query. SQL 语法更简洁好学, 但不适合参数化 (小心 SQL 注入). 下面这个例子两种方法都展示了.
s3_metrics_insight.py
1# -*- coding: utf-8 -*-
2
3"""
4Learn how to use get_metric_data API to query metrics
5
6Ref:
7
8- Metrics Insights query components and syntax: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-metrics-insights-querylanguage.html
9- cloudwatch_client.get_metric_data: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/get_metric_data.html
10"""
11
12from textwrap import dedent
13from datetime import datetime, timezone, timedelta
14from config import (
15 cw_client,
16 metric_namespace,
17 metric_name,
18 stream_name_1,
19 stream_name_2,
20)
21
22
23now = datetime.utcnow().replace(tzinfo=timezone.utc)
24five_minutes_ago = now - timedelta(minutes=5)
25
26
27def print_get_metric_data_response(res: dict):
28 dct = {
29 "Timestamps": res["MetricDataResults"][0]["Timestamps"],
30 "Values": res["MetricDataResults"][0]["Values"],
31 }
32 print(dct)
33
34
35def _use_metric_stat(dimensions: list) -> dict:
36 return cw_client.get_metric_data(
37 MetricDataQueries=[
38 dict(
39 Id="id1",
40 MetricStat=dict(
41 Metric=dict(
42 Namespace=metric_namespace,
43 MetricName=metric_name,
44 Dimensions=dimensions,
45 ),
46 Period=60,
47 Stat="Average",
48 ),
49 ReturnData=True,
50 ),
51 ],
52 StartTime=five_minutes_ago,
53 EndTime=now,
54 ScanBy="TimestampAscending",
55 )
56
57
58def use_metric_stat():
59 """
60 Here's a tricky part, you cannot use multiple dimensions with same name,
61 only the last one will be used.
62 It is NOT logic OR, and metrics insight doesn't support logic OR.
63 """
64 dimensions = [
65 dict(Name="server_id", Value=stream_name_1),
66 dict(Name="server_id", Value=stream_name_2),
67 ]
68 res = _use_metric_stat(dimensions=dimensions)
69 print(dimensions)
70 print_get_metric_data_response(res)
71
72 dimensions = [
73 # dict(Name="server_id", Value=stream_name_1),
74 dict(Name="server_id", Value=stream_name_2),
75 ]
76 res = _use_metric_stat(dimensions=dimensions)
77 print(dimensions)
78 print_get_metric_data_response(res)
79
80 dimensions = [
81 dict(Name="server_id", Value=stream_name_2),
82 dict(Name="server_id", Value=stream_name_1),
83 ]
84 res = _use_metric_stat(dimensions=dimensions)
85 print(dimensions)
86 print_get_metric_data_response(res)
87
88 dimensions = [
89 dict(Name="server_id", Value=stream_name_1),
90 # dict(Name="server_id", Value=stream_name_2),
91 ]
92 res = _use_metric_stat(dimensions=dimensions)
93 print(dimensions)
94 print_get_metric_data_response(res)
95
96 res = _use_metric_stat(
97 dimensions=[
98 # dict(Name="server_id", Value=stream_name_1),
99 # dict(Name="server_id", Value=stream_name_2),
100 ]
101 )
102 print_get_metric_data_response(res)
103
104
105def _use_expression(sql: str) -> dict:
106 return cw_client.get_metric_data(
107 MetricDataQueries=[
108 dict(
109 Id="id1",
110 Expression=dedent(sql.strip()),
111 Period=60,
112 ReturnData=True,
113 ),
114 ],
115 StartTime=five_minutes_ago,
116 EndTime=now,
117 ScanBy="TimestampAscending",
118 )
119
120
121def use_expression():
122 """
123 Metric insights doesn't support logic or, so we should use != instead.
124
125 List of queries:
126
127 SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_1}'
128 SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_2}'
129 SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id != 'xyz'
130 SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace})
131 """
132 sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_1}'"
133 res = _use_expression(sql)
134 print(sql)
135 print_get_metric_data_response(res)
136
137 sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id = '{stream_name_2}'"
138 res = _use_expression(sql)
139 print(sql)
140 print_get_metric_data_response(res)
141
142 sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace}, server_id) WHERE server_id != 'xyz'"
143 res = _use_expression(sql)
144 print(sql)
145 print_get_metric_data_response(res)
146
147 sql = f"SELECT AVG({metric_name}) FROM SCHEMA({metric_namespace})"
148 res = _use_expression(sql)
149 print(sql)
150 print_get_metric_data_response(res)
151
152
153if __name__ == "__main__":
154 use_metric_stat()
155 use_expression()
Reference: