以下是由 Arnold van Wijnbergen 撰写的客座博文。Arnold van Wijnbergen 是云原生技术领域的顾问、架构师、主管和主题专家,尤其是在可观测性方面。您几乎可以在阿姆斯特丹附近每个有趣的 Meetup 上找到他,这些 Meetup 涵盖了有趣的 DevSecOps、云原生或持续交付主题。
简介
在这篇文章中,我们将继续关于使用 Fluent Bit 将日志数据发送到 ClickHouse Cloud 的系列文章。虽然本系列之前的博客文章是关于 Nginx 和 Kubernetes 日志,但这篇文章侧重于威胁狩猎用例。简而言之,我们将解释设置 Microsoft Windows 的高级日志分析是多么容易。我们主要感兴趣的数据来源是 Windows 事件日志,它多年来经历了显著的发展。我们还将研究如何使用 SysInternals 的工具 Sysmon 扩展 Windows 日志收集。目标是识别恶意或异常活动,并了解入侵者和恶意软件如何在我们的网络上运行。随着 ClickHouse 越来越受欢迎地成为接收日志的后端,这简直是天作之合,尤其是在使用 Fluent Bit 时,Fluent Bit 提供了一种简单且开箱即用的方式来收集相关数据。
您将学习 Windows 事件日志的基础知识、如何部署 Fluent Bit 以进行 Windows 事件日志收集,以及如何创建简单的模式以将日志数据存储在 ClickHouse 中。此外,您还将了解使用社区模板配置 Sysmon 以设置高质量事件跟踪和使用 SysmonSimulator(另一个出色的开源计划)模拟某些事件的步骤。
环境
- Microsoft Windows Server 2022
- Fluent Bit v2.0.6
- Sysmon v14.13
- SysmonSimulator v0.2
对于 ClickHouse,我们建议尝试我们的无服务器 ClickHouse Cloud,它提供慷慨的免费试用版,足以遵循这篇博文。Developer 服务应该足以满足本文中的示例。或者,所有说明应与大于 22.6 的自管理版本兼容。对于构建实际的可视化效果,我们将使用 Grafana Cloud,使用官方 ClickHouse 插件。
Windows 事件日志基础知识
任何使用过 Windows 的人都知道著名的 Windows 事件查看器。它是您在使用系统、应用程序或安全日志调查问题时必须知道的标准工具之一。不太为人所知的是,多年来,经典模型(EVT 格式)已发展为丰富的“winevtlog.h”(EVTX 格式),现在具有完整的 API 支持和许多高级改进。这为应用程序和服务特定的日志通道提供了支持,这些通道由“Sysmon”等工具使用。
Fluent Bit 是一种数据收集器,支持此格式,可以帮助我们收集有用的日志来分析和识别潜在威胁。
使用 Sysmon 事件检测安全威胁
除了丰富的审核策略外,大多数安全工程师更喜欢使用 Sysmon 来识别威胁。Sysmon 由 SysInternals(现在是 Microsoft 的一部分)开发,它提供由设备驱动程序和正在运行的服务提供的高级内核功能。一旦安装并进行给定配置,它将开始监视和记录系统活动,例如进程创建、网络连接、注册表操作等。由于它可以平稳地集成到 Windows 事件日志通道中,因此我们可以使用 Fluent Bit 轻松地将这些日志传输到 ClickHouse 中。
最新版本可以从此处下载。只需将 ZIP 文件解压到目录中,例如“C:\Tools\Sysmon”。
解压后,您可以使用默认配置设置开始安装。
sysmon.exe -accepteula -i
社区将开源聚集在一起
默认的 Sysmon 配置可以被认为是基础。幸运的是,社区成员和安全专家“SwiftOnSecurity”(哦,天哪,不是泰勒·斯威夫特;))维护了一个高质量的事件跟踪配置,该配置与 MITRE ATT&CK® 矩阵技术保持一致。安装很容易 - 只需从 github 下载最新版本并应用配置即可。
sysmon.exe -c sysmonconfig-export.xml
使用 Fluent Bit 收集、处理和插入
太棒了,我们已经准备好使用 Fluent Bit 进行收集、处理和分发了。Fluent Bit 是一个 CNCF 毕业项目,是一个很棒的轻量级工具,它基于一个简单的配置文件,可以轻松帮助我们建立数据管道来处理所有生成的事件。在这种情况下,我们将选择使用 ZIP 包设置 Fluent Bit,但我们也可以决定使用安装程序或容器镜像。
首先下载正确的安装包,例如 发布的 ZIP 包,然后解压存档。
Expand-Archive .\fluent-bit-2.0.6-win64.zip C:\Tools
解压 ZIP 包后,是时候扩展 conf/fluent-bit.conf
中的默认管道配置了,如下所示,以配置我们的 input、filters 和 output。
[INPUT] Name winevtlog Channels Microsoft-Windows-Sysmon/Operational,Security Interval_Sec 1 DB winevtlog.sqlite [FILTER] Name nest Match * Operation nest Wildcard * Nest_under log [OUTPUT] name http tls on match * host <> port 8443 URI /?query=INSERT+INTO+<>.<>+FORMAT+JSONEachRow&async_insert=1 format json_stream json_date_key timestamp json_date_format epoch http_user << For demo purposes you can use ‘default’ >> http_passwd <>
有关输出配置的详细信息,特别是 URI 参数,我们建议阅读我们之前关于 Fluent Bit 的博客文章此处 和 此处。
请注意 URI 中 async_insert=1
设置的用法。这启用了异步插入,并且是在 ClickHouse 中使用 Fluent Bit 时的一个重要配置设置。有关更多详细信息,请参阅 此处。
启动 ClickHouse 服务并创建表
现在我们已经准备好 Fluent Bit 来收集和转发我们的日志,我们需要部署 ClickHouse 并创建我们的数据库。使用 ClickHouse Cloud,只需点击几下即可通过创建服务轻松完成此操作。
您可以在创建服务后通过单击 Connect
、“查看连接字符串”来查看连接详细信息。将这些详细信息复制到安全位置,因为稍后您需要它们来完成 Fluent Bit 输出部分。
创建后,我们可以创建数据库本身。这需要 ClickHouse Cloud 团队启用 JSON 对象类型,因为这仍然是实验性的。这可以通过打开支持票证来完成。
如果您运行的是自己的自管理实例,只需设置以下标志即可。
SET allow_experimental_object_type = 1
现在创建数据库。
CREATE DATABASE eventlogs;
启用 JSON 对象类型后,您可以开始配置用于存储 Windows 事件日志数据的列。或者,您可以将特定的字段(如 Computername
)映射到显式列,并使用更优化的 ClickHouse 类型(如 LowCardinality)来通过减少 I/O 来提高压缩和查询性能。这些映射的列也可以在表的 ORDER BY 键中使用,这对于提供最佳查询性能至关重要。
出于评估目的,JSON 类型很有用,因为字段是动态映射的。请记住,对于生产场景,您应该将 JSON 对象类型限制为较少的字段。保持预期和常用列在您的模式中定义良好并进行调整,以遵循最佳实践。请参阅 此处 以及我们最近关于 构建可观测性解决方案 的博文,了解更多详细信息。
现在,我们将保持简单,只添加 log 列。
CREATE TABLE eventlogs.jsonlogs ( timestamp DateTime, log JSON ) Engine = MergeTree ORDER BY tuple()
请注意,这里我们在“ORDER BY”键中没有任何列,而是使用了“tuple()”。我们建议在此处显式声明与您的生产访问模式匹配的列。
现在您可以填写 Fluent Bit 输出部分的空白了。不要忘记之前保存的连接详细信息。
[OUTPUT] name http tls on match * host <> port 8443 URI /?query=INSERT+INTO+eventlogs.jsonlogs+FORMAT+JSONEachRow&async_insert=1 format json_stream json_date_key timestamp json_date_format epoch http_user << For demo purposes you can use ‘default’ >> http_passwd <>
可视化 Windows 事件日志数据
目前,一切都已准备就绪。我们现在必须启动我们的 Fluent Bit 代理。我们将使用 CLI,但您也可以将其配置为服务。
./bin/fluent-bit -c conf/fluent-bit.conf
您会注意到 Fluent Bit 打开了多个与 ClickHouse 服务的连接。您可以使用 Cloud SQL 控制台来查看数据是否正在进入您的 ClickHouse 服务。
SELECT * FROM eventlogs.jsonlogs;
另一种查询数据的好方法是使用 format JSONEachRow 选项,尤其是在您使用 ClickHouse 客户端并想要使用 jq
来美化终端结果时。
~/clickhouse client --host --secure --password --query "SELECT * FROM eventlogs.jsonlogs LIMIT 1 FORMAT JSONEachRow" | jq { "timestamp": "2023-01-17 12:51:01", "log": { "ActivityID": "", "Channel": "Microsoft-Windows-Sysmon/Operational", "Computer": "EC2AMAZ-LPA5TA4", "EventID": 1, "EventRecordID": 483, "Keywords": "0x8000000000000000", "Level": 4, "Message": "Process Create:\r\nRuleName: -\r\nUtcTime: 2023-01-17 12:50:59.303\r\nProcessGuid: {f51ddac1-99b3-63c6-bb01-00000000e100}\r\nProcessId: 5216\r\nImage: C:\\fluent\\fluent-bit-2.0.6-win64\\bin\\fluent-bit.exe\r\nFileVersion: 2.0.6.0\r\nDescription: Compiled with MSVC 19.29.30146.0\r\nProduct: Fluent Bit - Fast and Lightweight Logs and Metrics processor for Linux, BSD, OSX and Windows\r\nCompany: Calyptia Inc.\r\nOriginalFileName: -\r\nCommandLine: fluent-bit.exe -c ../conf/fluent-bit.conf\r\nCurrentDirectory: C:\\fluent\\fluent-bit-2.0.6-win64\\bin\\\r\nUser: EC2AMAZ-LPA5TA4\\Administrator\r\nLogonGuid: {f51ddac1-8b43-63c6-6de9-0c0000000000}\r\nLogonId: 0xCE96D\r\nTerminalSessionId: 2\r\nIntegrityLevel: High\r\nHashes: MD5=4A19C3D18B025F49AA157FD2D360283A,SHA256=7511D65D0FB9FF2590C37E0C7C6150250A78E0D0BAFAB1473A97667AA5ADCA16,IMPHASH=654B204441EFA0776C100878702194C3\r\nParentProcessGuid: {f51ddac1-8d06-63c6-4d01-00000000e100}\r\nParentProcessId: 7720\r\nParentImage: C:\\Windows\\System32\\cmd.exe\r\nParentCommandLine: \"C:\\Windows\\system32\\cmd.exe\" \r\nParentUser: EC2AMAZ-LPA5TA4\\Administrator", "Opcode": 0, "ProcessID": 7536, "ProviderGuid": "{5770385F-C22A-43E0-BF4C-06F5698FFBD9}", "ProviderName": "Microsoft-Windows-Sysmon", "Qualifiers": "", "RelatedActivityID": "", "StringInserts": [ "-", "2023-01-17 12:50:59.303", "{F51DDAC1-99B3-63C6-BB01-00000000E100}", "5216", "C:\\fluent\\fluent-bit-2.0.6-win64\\bin\\fluent-bit.exe", "2.0.6.0", "Compiled with MSVC 19.29.30146.0", "Fluent Bit - Fast and Lightweight Logs and Metrics processor for Linux, BSD, OSX and Windows", "Calyptia Inc.", "-", "fluent-bit.exe -c ../conf/fluent-bit.conf", "C:\\fluent\\fluent-bit-2.0.6-win64\\bin\\", "EC2AMAZ-LPA5TA4\\Administrator", "{F51DDAC1-8B43-63C6-6DE9-0C0000000000}", "0xce96d", "2", "High", "MD5=4A19C3D18B025F49AA157FD2D360283A,SHA256=7511D65D0FB9FF2590C37E0C7C6150250A78E0D0BAFAB1473A97667AA5ADCA16,IMPHASH=654B204441EFA0776C100878702194C3", "{F51DDAC1-8D06-63C6-4D01-00000000E100}", "7720", "C:\\Windows\\System32\\cmd.exe", "\"C:\\Windows\\system32\\cmd.exe\" ", "EC2AMAZ-LPA5TA4\\Administrator" ], "Task": 1, "ThreadID": 7456, "TimeCreated": "2023-01-17 12:50:59 +0000", "UserID": "S-1-5-18", "Version": 5 } }
想知道推断的 JSON 架构吗?只需运行以下 SQL 查询即可。
DESCRIBE TABLE eventlogs.jsonlogs FORMAT Vertical SETTINGS describe_extend_object_types = 1 Row 1: ────── name: timestamp type: DateTime default_type: default_expression: comment: codec_expression: ttl_expression: Row 2: ────── name: log type: Tuple(ActivityID String, Channel String, Computer String, EventID Int16, EventRecordID Int32, Keywords String, Level Int8, Message String, Opcode Int8, ProcessID Int16, ProviderGuid String, ProviderName String, Qualifiers String, RelatedActivityID String, StringInserts Array(String), Task Int16, ThreadID Int16, TimeCreated String, UserID String, Version Int8) default_type: default_expression: comment: codec_expression: ttl_expression: 2 rows in set. Elapsed: 0.001 sec.
用于数据可视化的 Grafana
我们现在可以开始在我们的 ClickHouse 数据之上创建数据可视化和仪表板。为此,我们将利用 Grafana 的 ClickHouse 插件。此插件在 Grafana Cloud 免费层中可用。您可以将此插件添加为数据源,并使用之前的连接详细信息进行实际配置。当“保存并测试”成功时,您已成功配置数据源。此视频 提供了将 Grafana 连接到 ClickHouse 的简单介绍。
我们想要显示的有趣指标包括每个 EventID 发生的各种 Sysmon 事件计数,例如,DNS 事件为 22。另一个感兴趣的指标可以是安全通道中记录的审核失败百分比。
为了方便起见,我已经创建了一个示例仪表板,可在我的 GitHub 上下载。
作为参考,这是在 Grafana 的 Explorer 视图中执行的完整示例 SQL 查询
SELECT timestamp AS log_time, log.Message AS Message, log.Channel AS Channel, log.Computer AS Computer, log.EventID AS EventID, log.TimeCreated AS TimeCreated, log.ActivityID AS ActivityID, log.EventRecordID AS EventRecordID, log.Keywords AS Keywords, log.Level AS SeverityLevel, log.Opcode AS OPcode, log.ProcessID AS ProcessID, log.ProviderGuid AS ProviderGuid, log.ProviderName AS ProviderName, log.Qualifiers AS Qualifiers, log.RelatedActivityID AS RelatedActivityID, log.StringInserts AS StringInserts, log.Task AS Task, log.ThreadID AS ThreadID, log.UserID AS UserID, log.Version AS Version FROM eventlogs.jsonlogs LIMIT 100

测试稳定性和性能
使用 SysmonSimulator 模拟威胁狩猎场景
当没有数据或潜在安全威胁出现时,观看仪表板并没有什么令人兴奋的。这就是为什么我们将使用由 Scarred Monk 创建的 SysmonSimulator。此工具的二进制文件可以从 此处 下载。
此模拟器与 Sysmon 协同工作以模拟几个潜在的安全威胁。为了获得最佳体验,您可以使用 SysmonSimulator 存储库中提供的 配置文件。
请注意,Microsoft Defender 可能会将此工具阻止为潜在风险,因此您可能必须接受该风险;否则,下载和执行将被阻止。
sysmon.exe -c SysmonSimulatorconfig.xml
启动模拟很简单。只需使用“-all”标志执行即可。同样为了方便起见,您可以使用以下 PowerShell 单行代码。
While ($True -eq “True”) { ./SysmonSimulator -all}
在我们启动模拟器进程后,我们在 Grafana 中的仪表板将填充更多有趣的数据。

扩展
为了验证我们解决方案的性能,我们使用了另一个工具来摄取大量 Windows 事件日志。当使用 Linux 日志时,我们更喜欢使用 lignator 进行模拟以生成日志,但在这种情况下,我们需要直接摄取到 Windows 事件日志中。
为了自动化此用例,我们使用了一个 Github 上可用的工具,名为 goeventgen,由 Andrew Krohu 创建。作为输入,您可能需要使用大量的日志行 集合 来处理。此集合约为 28 GB,非常适合用于性能测试。
只需下载发布的执行文件,并在提取 Windows.log 后执行以下命令。
.\goeventgen-amd64.exe -source TestSource -f .\Windows.log
执行后,您将获得持续时间和处理的总事件数的摘要。在我们的测试实例中,我们获得了大约 3000 eps,这对于一个小型 Windows 2022 VM 来说已经很棒了。这里最大的瓶颈是主机 VM 上可用的 CPU 资源。这对于 ClickHouse 和 Fluent Bit 来说只是非常小的吞吐量,它们都能够扩展到每秒数百万个事件。
结论
这篇博客展示了使用 ClickHouse、Grafana 和 Fluent Bit 和 Sysmon 等工具设置 Windows 安全堆栈是多么容易。我们介绍了 ClickHouse 使用 JSON 类型的动态字段映射功能,并解释了在哪些情况下这会很有帮助。除了设置之外,我们还使用 SysmonSimulator 执行了潜在安全威胁的模拟,并最终使用 Grafana 来可视化和深入研究细节。此外,我们执行了大量事件流以验证性能和稳定性。在此练习期间,我们甚至没有触及 ClickHouse 或 Fluent Bit 的限制,而是 Windows 实例资源。
计算数据,我们声称每个事件的平均大小约为 14 字节,而存储 Windows 事件日志声明为 200 字节。这是一个 1/14 的压缩比,并且与其他测试一致。这表明 ClickHouse 在存储日志方面非常高效,并且可以显著节省成本!
简而言之,我们可以得出结论,ClickHouse 在存储方面非常高效,并且是用于存储日志的高性能面向列的数据库,并且以最小的复杂性与其他工具集成良好。