简介
AWS VPC Flow Logs 允许您捕获有关进出 VPC 中网络接口的 IP 流量的详细信息。它包含源 IP 和目标 IP、源端口和目标端口、开始时间和结束时间、使用的协议、发送的字节数以及其他一些指标。此数据可用于调试安全组问题、监控入口和出口流量以及检查跨可用区流量,这有助于减少您的云账单。
ClickHouse 是一个开源的面向列的 DBMS,用于在线分析处理,允许用户使用 SQL 查询实时生成分析报告。在这篇博文中,我们将使用 ClickHouse 展示分析 Flow Logs 有多容易。
高层概述
在为我们感兴趣的 VPC 启用 AWS Flow Logs 后,我通常会收集 24 小时的数据以覆盖典型的工作日。为了便于将数据导入 ClickHouse,我们将以 parquet 格式将其存储在 S3 中。
下图显示了一个典型的 VPC 设置,其中包含三个公有子网和三个私有子网。由于 AWS 对跨可用区(红色箭头)的流量收取额外费用,因此我们的目标是分析 flow logs 以识别这些成本高昂的组件工作负载。一旦我们确定了这些组件,我们就可以将工程工作重点放在首先改进这些组件上。
最后一句“每个方向每 GB 0.01 美元”具有误导性。实际上,AWS 中的跨 AZ 数据传输成本为每千兆字节 2 美分,并且每个传输的千兆字节在账单上计为 2GB:一次用于发送,一次用于接收。” https://www.lastweekinaws.com/blog/aws-cross-az-data-transfer-costs-more-than-aws-says/
步骤 1. 创建 S3 存储桶
说够了。让我们开始吧。第一步,我们需要为我们的 VPC 启用 Flow Logs。首先,您需要创建一个 S3 存储桶,用于存储 parquet 文件。确保该存储桶不是公开可访问的。
步骤 2. 启用 Flow Logs
转到您的 VPC 设置,然后在“操作”下启用 Flow Logs。
对于我们的用例,我们希望收集“全部”数据,并将数据存储在我们在步骤 1 中创建的 S3 存储桶中。
请选择“Parquet”作为日志文件格式,因为这使得导入数据更加容易并缩短了加载时间。
现在我们必须等待 24 小时才能收集数据。您可以在您的 S3 存储桶中检查是否已创建 parquet 文件。
步骤 3. 将数据导入 ClickHouse
为了继续学习,您有三个选项可以开始使用 ClickHouse
- ClickHouse Cloud:官方 ClickHouse 即服务 - 由 ClickHouse 的创建者构建、维护和支持
- 自托管 ClickHouse:ClickHouse 可以在任何 Linux、FreeBSD 或 macOS 上运行,CPU 架构为 x86-64、ARM 或 PowerPC64LE
- Docker 镜像:阅读 Docker Hub 中包含官方镜像的指南
所有三个选项都适用于这篇博文。
步骤 3.1:定义 flow logs 的初始表结构
在导入数据之前,我们应该检查数据格式并创建表结构。ClickHouse 可以自动确定几乎所有支持的输入格式中输入数据的结构。以下命令显示了我们的一个 parquet 文件的表结构。
> clickhouse local --query "DESCRIBE TABLE file('4XXXXXXXXXXX_vpcflowlogs_us-east-2_fl-0dfd338b697dcd99d_20230124T1540Z_c83147c7.log.parquet')" --format=Pretty ┌─name─────────┬─type─────────────┬ │ version │ Nullable(Int32) │ │ account_id │ Nullable(String) │ │ interface_id │ Nullable(String) │ │ srcaddr │ Nullable(String) │ │ dstaddr │ Nullable(String) │ │ srcport │ Nullable(Int32) │ │ dstport │ Nullable(Int32) │ │ protocol │ Nullable(Int32) │ │ packets │ Nullable(Int64) │ │ bytes │ Nullable(Int64) │ │ start │ Nullable(Int64) │ │ end │ Nullable(Int64) │ │ action │ Nullable(String) │ │ log_status │ Nullable(String) │ └──────────────┴──────────────────┴
基于 DESCRIBE TABLE
输出,我们可以为我们的 Flow Logs 创建一个初始表结构。
CREATE TABLE IF NOT EXISTS flowlogs_us_east_2 ( `version` Int32 NULL, `account_id` String NULL, `interface_id` String NULL, `srcaddr` String NULL, `dstaddr` String NULL, `srcport` Int32 NULL, `dstport` Int32 NULL, `protocol` Int32 NULL, `packets` Int64 NULL, `bytes` Int64 NULL, `start` Int64 NULL, `end` Int64 NULL, `action` String NULL, `log_status` String NULL ) ENGINE = MergeTree ORDER BY tuple()
步骤 3.2:调整表结构
让我们改进这个表结构,以便获得最佳查询性能。经过一些调整,我们最终得到了以下 CREATE TABLE 语句。
CREATE TABLE default.flowlogs_us_east_2_v4 ( `version` Nullable(Int32), `account_id` LowCardinality(Nullable(String)), # LowCardinality `interface_id` LowCardinality(Nullable(String)), # LowCardinality `srcaddr` Nullable(IPv4), # IPv4 datatype `dstaddr` Nullable(IPv4), # IPv4 datatype `srcport` Nullable(Int32), `dstport` Nullable(Int32), `protocol` Nullable(Int32), `packets` Nullable(Int64), `bytes` Nullable(Int64), `start` Nullable(DateTime('UTC')), # DateTime datatype `end` Nullable(DateTime('UTC')), # DateTime datatype `action` Enum('ACCEPT', 'REJECT', '-'), # Enumerated type `log_status` LowCardinality(Nullable(String)) # LowCardinality ) ENGINE = MergeTree ORDER BY (action, srcaddr, dstaddr, protocol, start, end) SETTINGS allow_nullable_key = 1 # SETTINGS
类型 | 描述 |
LowCardinality | 使用 LowCardinality 数据类型的效率取决于数据多样性。如果字典包含少于 10,000 个不同的值,那么 ClickHouse 大多显示更高的数据读取和存储效率。如果字典包含超过 100,000 个不同的值,那么与使用普通数据类型相比,ClickHouse 的性能可能会更差。 |
IPv4 | IPv4 是基于 UInt32 类型的域,用作存储 IPv4 值的类型化替代品。它提供紧凑的存储,具有人性化的输入输出格式和检查时的列类型信息。 |
DateTime | DateTime 允许存储可以表示为日历日期和一天中某个时刻的时间瞬间。 |
Enum('ACCEPT', 'REJECT', '-') | 由命名值组成的枚举类型。ClickHouse 仅存储数字,但支持通过其名称对值进行操作。 |
SETTINGS allow_nullable_key = 1 | 此设置允许在 MergeTree 表的排序和主键中使用Nullable 类型的值。 |
此查询有助于查找 action 列的所有枚举值。
SELECT action, count() FROM flowlogs_us_east_2 GROUP BY action ┌─action─┬───count()─┐ │ - │ 794510 │ │ REJECT │ 3164999 │ │ ACCEPT │ 510006128 │ └────────┴───────────┘
步骤 3.3.1:直接从 S3 导入数据
您有不同的选项可以将数据导入到您的 ClickHouse 实例。您可以使用以下命令直接从 S3 导入文件。这是最方便的方式。
INSERT INTO flowlogs_us_east_2 SELECT * FROM s3( 'https://s3.us-east-2.amazonaws.com//AWSLogs//vpcflowlogs//2023/01/**/*.log.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', Parquet ) 0 rows in set. Elapsed: 744.802 sec. Processed 517.07 million rows, 32.55 GB (694.24 thousand rows/s., 43.70 MB/s.)
步骤 3.3.2:从本地计算机导入数据
您可以从 S3 下载所有 parquet 文件,然后使用以下步骤直接将它们导入到您的 ClickHouse 实例。此导入将花费更长的时间,因为它取决于您的互联网连接,但是如果您可以本地访问数据,那么这是您可以使用的另一个选项。
aws s3 sync s3://<prefix>-us-east-2-flow-logs.clickhouse.cloud .
for f in **/*.log.parquet
do
echo "Importing $f"
cat $f | clickhouse client --query "INSERT INTO flowlogs_us_east_2_v4 FORMAT Parquet" --host <instance>.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <password>
done
步骤 3.4:导入数据集统计信息
我在这篇文章中使用的 Flow Log 数据集包含大约 5 亿行。
SELECT concat(database, '.', table) AS table, formatReadableSize(sum(bytes)) AS size, sum(rows) AS rows, max(modification_time) AS latest_modification, sum(bytes) AS bytes_size, formatReadableSize(sum(primary_key_bytes_in_memory)) AS primary_keys_size FROM system.parts WHERE active AND (table = 'default.flowlogs_us_east_2_v4') GROUP BY database, table ORDER BY bytes_size DESC
┌─table─────────────────────────┬─size─────┬──────rows─┬─latest_modification─┬─bytes_size─┬─primary_keys_size─┐ │ default.flowlogs_us_east_2_v4 │ 2.30 GiB │ 517069187 │ 2023-01-30 13:03:51 │ 2465625288 │ 3.63 MiB │ └───────────────────────────────┴──────────┴───────────┴─────────────────────┴────────────┴───────────────────┘
相同的表统计信息可以直接从系统表中收集。
SELECT name, primary_key, total_rows, total_bytes FROM system.tables WHERE name = 'flowlogs_us_east_2_v4'
┌─name──────────────────┬─primary_key────────────────────────────────────┬─total_rows─┬─total_bytes─┐ │ flowlogs_us_east_2_v4 │ action, srcaddr, dstaddr, protocol, start, end │ 517069187 │ 2465625288 │ └───────────────────────┴────────────────────────────────────────────────┴────────────┴─────────────┘
步骤 4:分析数据
现在我们已经在 ClickHouse 中加载了数据,我们可以对其进行分析。以下是一些您可以运行以分析 flow logs 的示例查询。
步骤 4.1:查找流量被拒绝的 IP 地址
SELECT srcaddr, dstaddr, count(*) AS count FROM flowlogs_us_east_2_v4 WHERE action = 'REJECT' GROUP BY srcaddr, dstaddr ORDER BY count DESC LIMIT 10 ┌─srcaddr───────┬─dstaddr───────┬─count─┐ │ 52.219.93.41 │ 10.xx.148.26 │ 5068 │ │ 10.xx.34.239 │ 10.xx.158.94 │ 4575 │ │ 10.xx.34.239 │ 10.xx.18.221 │ 4569 │ │ 10.xx.34.239 │ 10.xx.165.205 │ 4569 │ │ 10.xx.61.214 │ 10.xx.124.154 │ 4567 │ │ 10.xx.34.239 │ 10.xx.57.228 │ 4567 │ │ 10.xx.61.214 │ 10.xx.57.150 │ 4566 │ │ 10.xx.61.214 │ 10.xx.42.227 │ 4565 │ │ 10.xx.134.164 │ 10.xx.42.227 │ 4565 │ │ 10.xx.34.239 │ 10.xx.7.57 │ 4565 │ └───────────────┴───────────────┴───────┘ 10 rows in set. Elapsed: 0.631 sec. Processed 3.22 million rows, 145.81 MB (5.10 million rows/s., 230.90 MB/s.)
步骤 4.2:查找流量最多的 IP 地址
SELECT srcaddr, dstaddr, sum(bytes) AS sum_bytes, sum(packets) AS sum_packets, count(*) AS num_connects FROM flowlogs_us_east_2_v4 GROUP BY srcaddr, dstaddr ORDER BY sum_bytes DESC LIMIT 10 ┌─srcaddr───────┬─dstaddr───────┬────sum_bytes─┬─sum_packets─┬─num_connects─┐ │ 52.219.98.217 │ 10.xx.4.152 │ 408892749105 │ 288418578 │ 16720 │ │ 52.219.101.9 │ 10.xx.148.26 │ 113090806589 │ 79170936 │ 2354 │ │ 52.219.92.65 │ 10.xx.129.150 │ 104062457099 │ 72194254 │ 2787 │ │ 10.xx.151.54 │ 162.xxx.yyy.2 │ 90002563685 │ 62017417 │ 2739 │ │ 10.xx.151.54 │ 10.xx.232.160 │ 85990237301 │ 60482186 │ 37800 │ │ 10.xx.232.160 │ 162.xxx.yyy.2 │ 83703023903 │ 63673370 │ 9048 │ │ 162.xxx.yyy.2 │ 10.xx.143.254 │ 76876274499 │ 51932321 │ 7026 │ │ 162.xxx.yyy.2 │ 10.xx.232.160 │ 71774911712 │ 58531508 │ 9069 │ │ 10.xx.232.160 │ 10.xx.143.254 │ 71636349482 │ 49617103 │ 41563 │ │ 10.xx.72.138 │ 162.xxx.yyy.2 │ 68960063436 │ 46908157 │ 4038 │ └───────────────┴───────────────┴──────────────┴─────────────┴──────────────┘ 10 rows in set. Elapsed: 30.346 sec. Processed 517.07 million rows, 32.23 GB (17.04 million rows/s., 1.06 GB/s.)
步骤 4.3:查找来自 VPC 外部流量最多的 IP 地址
WITH IPv4CIDRToRange(toIPv4('10.XX.0.0'), 16) AS mask SELECT srcaddr, dstaddr, sum(bytes) AS sum_bytes, sum(packets) AS sum_packets, count(*) AS num_connects FROM flowlogs_us_east_2_v4 WHERE (srcaddr < (mask.1)) OR (srcaddr > (mask.2)) GROUP BY srcaddr, dstaddr ORDER BY sum_bytes DESC LIMIT 10 ┌─srcaddr────────┬─dstaddr───────┬────sum_bytes─┬─sum_packets─┬─num_connects─┐ │ 52.219.98.217 │ 10.XX.4.152 │ 408892749105 │ 288418578 │ 16720 │ │ 52.219.101.9 │ 10.XX.148.26 │ 113090806589 │ 79170936 │ 2354 │ │ 52.219.92.65 │ 10.XX.129.150 │ 104062457099 │ 72194254 │ 2787 │ │ 162.243.189.2 │ 10.XX.143.254 │ 76876274499 │ 51932321 │ 7026 │ │ 162.243.189.2 │ 10.XX.232.160 │ 71774911712 │ 58531508 │ 9069 │ │ 52.219.176.33 │ 10.XX.4.152 │ 64240559865 │ 44917125 │ 2682 │ │ 52.219.109.137 │ 10.XX.129.150 │ 39752096707 │ 27800978 │ 823 │ │ 52.219.98.145 │ 10.XX.123.186 │ 39421406790 │ 28161428 │ 2426 │ │ 52.219.109.153 │ 10.XX.123.186 │ 32397795186 │ 23754825 │ 4861 │ │ 52.219.142.65 │ 10.XX.148.26 │ 32010932847 │ 22743875 │ 3889 │ └────────────────┴───────────────┴──────────────┴─────────────┴──────────────┘ 10 rows in set. Elapsed: 4.327 sec. Processed 105.19 million rows, 2.95 GB (24.31 million rows/s., 680.69 MB/s.)
步骤 4.4:查找去往公有 IP 地址流量最多的 IP 地址
WITH IPv4CIDRToRange(toIPv4('10.XX.0.0'), 16) AS mask SELECT srcaddr, dstaddr, sum(bytes) AS sum_bytes, sum(packets) AS sum_packets, count(*) AS num_connects FROM flowlogs_us_east_2_v4 WHERE (dstaddr < (mask.1)) OR (dstaddr > (mask.2)) GROUP BY srcaddr, dstaddr ORDER BY sum_bytes DESC LIMIT 10 ┌─srcaddr───────┬─dstaddr────────┬───sum_bytes─┬─sum_packets─┬─num_connects─┐ │ 10.XX.151.54 │ 162.243.189.2 │ 90002563685 │ 62017417 │ 2739 │ │ 10.XX.232.160 │ 162.243.189.2 │ 83703023903 │ 63673370 │ 9048 │ │ 10.XX.72.138 │ 162.243.189.2 │ 68960063436 │ 46908157 │ 4038 │ │ 10.XX.212.81 │ 162.243.189.2 │ 61244530980 │ 41655380 │ 3613 │ │ 10.XX.123.186 │ 52.219.108.201 │ 18577571671 │ 13228030 │ 13384 │ │ 10.XX.123.186 │ 52.219.94.153 │ 16666940461 │ 11551738 │ 2477 │ │ 10.XX.151.54 │ 52.219.110.185 │ 14360554536 │ 10297054 │ 8184 │ │ 10.XX.72.138 │ 52.219.143.81 │ 14306330457 │ 10432147 │ 18176 │ │ 10.XX.123.186 │ 52.219.99.57 │ 14168694748 │ 10038959 │ 7574 │ │ 10.XX.123.186 │ 52.219.143.73 │ 14158734985 │ 9845027 │ 2867 │ └───────────────┴────────────────┴─────────────┴─────────────┴──────────────┘ 10 rows in set. Elapsed: 4.361 sec. Processed 160.77 million rows, 3.46 GB (36.87 million rows/s., 792.99 MB/s.)
对以 `52.219.x.x` 开头的目标 IP 地址进行 Web 搜索显示,这些 IP 地址属于 AWS S3 服务。
来源:https://www.netify.ai/resources/ips/52.219.108.201
步骤 5:丰富 Flow Logs
回到我们分析跨可用区 (AZ) 流量的初始计划,我们需要加载 EC2 元数据,我们可以将其用于我们的分析。不幸的是,AWS flow logs 不包含任何关于 IP 地址以及 EC2 实例在哪个 AZ 中运行的数据。因此,我们将从 AWS API 检索此数据并将数据存储在 ClickHouse 中。
步骤 5.1:获取 AWS IP 的元数据
获取 VPC 中 IP 地址的可用区 (AZ) 的一种方法是使用 aws ec2 describe-instances
CLI 命令。我们所有的 EC2 实例都已标记,因此我们知道哪些组件在它们上面运行。以下是我们的输出的简化示例。通过运行此命令,我们正在创建一个制表符分隔值文件,可以轻松地将其导入到 ClickHouse 中。您可以更改标签以匹配您在环境中使用的名称。
aws ec2 describe-instances --output text --query 'Reservations[*].Instances[*].[InstanceId, Placement.AvailabilityZone, PrivateIpAddress, [Tags[?Key==`Name`].Value] [0][0], [Tags[?Key==`eks:nodegroup-name`].Value] [0][0], [Tags[?Key==`dataplane_component`].Value] [0][0] ]' > us-east2-ec2-metadata.tsv
此命令的输出将如下所示。
i-0bda6c63322caa392 us-east-2b 10.xx.89.232 core ng-us-east-2-core-b-0 i-0b283e306faa2fed3 us-east-2c 10.xx.134.164 core ng-us-east-2-core-c-0 i-04ac9aea1fd1e04b9 us-east-2a 10.xx.61.214 core ng-us-east-2-core-a-0 i-0c037e5f3cbf70abe us-east-2a 10.xx.34.239 core ng-us-east-2-core-a-0 i-039325803992c97d5 us-east-2a 10.xx.40.15 keeper ng-us-east-2-keeper-a-0 i-00d0c53e442d6c445 us-east-2a 10.xx.45.139 keeper ng-us-east-2-keeper-a-0 i-08a520c6a5b0f2ff9 us-east-2a 10.xx.59.108 keeper ng-us-east-2-keeper-a-0
要导入此制表符分隔值文件,我们首先需要创建一个表。您可以使用 clickhouse local
来获取架构的描述。
> clickhouse local --query "DESCRIBE TABLE file('us-east2-ec2-metadata.tsv')" --format=Pretty
CREATE TABLE us_east_2_ec2metadata
(
`instanceId` LowCardinality(Nullable(String)),
`availabilityZone` LowCardinality(Nullable(String)),
`privateIpAddress` Nullable(IPv4),
`tagName` LowCardinality(Nullable(String)),
`tagNodegroupName` LowCardinality(Nullable(String))
)
ENGINE = MergeTree
ORDER BY privateIpAddress
SETTINGS allow_nullable_key = 1
要导入此数据,您可以运行以下命令
cat us-east2-ec2-metadata.tsv | clickhouse client --query "INSERT INTO us_east_2_ec2metadata FORMAT TSV"
--host <instance>.us-west-2.aws.clickhouse.cloud
--secure
--port 9440
--password <password>
从 EC2 元数据中,我们创建一个字典,这将使我们更容易使用 EC2 标签丰富我们的结果集。
CREATE DICTIONARY us_east_2_ec2_instances_dict ( `privateIpAddress` Nullable(String), `instanceId` Nullable(String), `availabilityZone` Nullable(String), `tagName` Nullable(String), `tagNodegroupName` Nullable(String) ) PRIMARY KEY privateIpAddress SOURCE(CLICKHOUSE(DB 'default' TABLE us_east_2_ec2metadata)) LIFETIME(MIN 1 MAX 10) LAYOUT(COMPLEX_KEY_HASHED())
使用字典,我们现在可以轻松地查找给定 IP 的标签并丰富我们的结果表。
SELECT dictGet(us_east_2_ec2_instances_dict, 'tagName', '10.xx.0.239') ┌─dictGet(us_east_2_ec2_instances_dict, 'tagName', '10.xx.0.239')─┐ │ core │ └─────────────────────────────────────────────────────────────────┘ 1 rows in set. Elapsed: 0.485 sec.
步骤 5.2:查找跨可用区流量最多的 IP 地址
SELECT f.srcaddr, dictGetOrNull('us_east_2_ec2_instances_dict', 'tagName', IPv4NumToString(f.srcaddr)) AS tagSrc, dictGetOrNull('us_east_2_ec2_instances_dict', 'availabilityZone', IPv4NumToString(f.srcaddr)) AS azSrc, f.dstaddr, dictGetOrNull('us_east_2_ec2_instances_dict', 'tagName', IPv4NumToString(f.dstaddr)) AS tagDest, dictGetOrNull('us_east_2_ec2_instances_dict', 'availabilityZone', IPv4NumToString(f.dstaddr)) AS azDest, sum(f.bytes) AS sum_bytes FROM flowlogs_us_east_2_v4 AS f INNER JOIN us_east_2_ec2metadata AS i1 ON f.srcaddr = i1.privateIpAddress INNER JOIN us_east_2_ec2metadata AS i2 ON f.dstaddr = i2.privateIpAddress WHERE i1.availabilityZone != i2.availabilityZone GROUP BY f.srcaddr, f.dstaddr ORDER BY sum_bytes DESC LIMIT 10 ┌─f.srcaddr─────┬─tagSrc───┬─azSrc──────┬─f.dstaddr─────┬─tagDest─┬─azDest─────┬──sum_bytes─┐ │ 10.xx.171.252 │ core │ us-east-2c │ 10.xx.0.239 │ core │ us-east-2a │ 1902671332 │ │ 10.xx.74.154 │ core │ us-east-2b │ 10.xx.0.239 │ core │ us-east-2a │ 507520688 │ │ 10.xx.172.251 │ core │ us-east-2c │ 10.xx.0.239 │ core │ us-east-2a │ 224974948 │ │ 10.xx.15.27 │ dev │ us-east-2a │ 10.xx.153.9 │ server │ us-east-2c │ 43971454 │ │ 10.xx.19.138 │ server │ us-east-2a │ 10.xx.153.9 │ server │ us-east-2c │ 42983148 │ │ 10.xx.6.209 │ mgmt │ us-east-2a │ 10.xx.87.223 │ mgmt │ us-east-2b │ 41120344 │ │ 10.xx.122.178 │ server │ us-east-2b │ 10.xx.153.9 │ server │ us-east-2c │ 40911334 │ │ 10.xx.72.138 │ dev │ us-east-2b │ 10.xx.153.9 │ server │ us-east-2c │ 37413716 │ │ 10.xx.47.141 │ server │ us-east-2a │ 10.xx.153.9 │ server │ us-east-2c │ 37273446 │ │ 10.xx.0.239 │ core │ us-east-2a │ 10.xx.171.252 │ core │ us-east-2c │ 33990090 │ └───────────────┴──────────┴────────────┴───────────────┴─────────┴────────────┴────────────┘
现在我们知道哪些组件导致了最多的跨可用区流量,我们可以专注于改进这些组件的读取和写入数据。
总结
我希望您在这篇文章中找到了一些有用的信息。现在您已经在 ClickHouse 中加载了 flow log 数据,您可以随意使用全套 SQL 功能来切片和剖析您的数据。
期待您的评论。我很好奇您还在使用 flow logs 做什么。
链接
- AWS Flow Logs, https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html
- 关于 AWS 跨 az 数据传输成本的博客, https://www.lastweekinaws.com/blog/aws-cross-az-data-transfer-costs-more-than-aws-says/
- ClickHouse 文档, https://clickhouse.ac.cn/docs/en/home