在 ClickHouse,我们将我们的使命视为向客户和用户提供闪电般快速的云分析数据库,该数据库可用于内部和面向客户的分析。ClickHouse Cloud 使我们的客户能够存储和处理几乎无限量的数据,这有助于他们做出数据驱动的决策。对于当今大多数成功的企业而言,基于事实而非假设做出决策至关重要。
当然,我们在团队内部也遵循相同的方法。开发和运营我们的云数据库产品会生成大量数据,这些数据可用于容量规划、定价、更好地了解客户需求以及财务报告。数十个数据源、数百 TB 以及大约一百个 BI 和临时用户……猜猜看 - 我们正在使用 ClickHouse Cloud 来处理这个问题 :)
在这篇文章中,我将分享我们的内部数据仓库 (DWH) 是如何构建的、我们使用的技术栈,以及我们的 DWH 在未来几个月将如何发展。
需求和数据源
我们在 2022 年 5 月发布了 ClickHouse Cloud 的私有预览版,与此同时,我们意识到我们想要更好地了解我们的客户:他们如何使用我们的服务,他们有哪些困难,我们如何帮助他们,以及我们如何使我们的定价对他们来说是可负担和合理的。为此,我们需要收集和处理来自多个内部数据源的数据:数据平面,它负责运行客户的数据库 pod;控制平面,它负责面向客户的 UI 和数据库操作;以及 AWS 账单,它为我们提供了运行客户工作负载的确切成本。
曾有一段短暂的时间,我们的产品副总裁 Tanya Bragin 每天使用 Excel 对我们的客户工作负载进行手动分析,使用查找功能。作为一名前 DWH 架构师,我为她不得不这样挣扎而感到羞愧,因此内部 DWH 的第一个概念应运而生。
在设计系统时,我们有许多关键任务旨在支持我们的内部利益相关者,下面列出了一些。
内部团队 | 任务 |
---|---|
产品团队 | 跟踪转化率和留存率、功能使用情况、服务规模、使用情况并找到最常见的问题。进行深入的临时分析。 |
运营团队 | 跟踪大致收入并向公司的大多数人提供对某些 Salesforce 数据的只读访问权限 |
销售团队 | 查看特定客户的设置和使用情况:有多少服务、多少数据、常见问题等。 |
工程团队 | 调整我们的自动扩缩器、跟踪查询错误率和数据库功能使用情况 |
支持团队 | 查看特定客户的设置:服务、使用情况、数据量等 |
营销团队 | 跟踪潜在客户转化率、客户获取成本和其他营销指标 |
成本节约团队 | 分析我们的 CSP 成本并主动优化我们的 CSP 承诺 |
CI-CD 团队 | 跟踪 CI-CD 成本 |
注意:在我们的内部数据仓库中,我们不收集、存储或处理客户数据的任何部分(其中大部分是加密的),例如表数据、查询文本、网络数据等。例如,对于查询分析,我们仅收集使用的函数列表、查询运行时、使用的内存和一些其他元信息。我们从不收集查询数据或查询文本。
为了实现这一目标,我们制定了一个计划,从数十个来源摄取数据,包括以下来源。
数据源 | 类型和大小 | 数据 |
---|---|---|
控制平面 | DocumentDB ~5 个集合 ~每小时 500 Mb | 数据库服务元信息:类型、大小、CSP 区域、状态、财务计划、扩展设置等。 |
数据平面 | ClickHouse Cloud ~5 个表 ~每小时 15 Gb | 数据库系统信息:指标统计、查询统计、表统计、pod 分配等。 |
AWS CUR | S3 存储桶 1 个表 ~每小时 1 Gb | 我们运行服务所需的 AWS 基础设施的成本和使用情况 |
GCP 账单 | BigQuery 1 个表 ~每小时 500 Mb | 我们运行服务所需的 GCP 基础设施的成本和使用情况 |
Salesforce (CRM) | 自定义 ~30 个表 ~每小时 1 Gb | 有关客户帐户、使用计划、订阅、折扣、区域、潜在客户和支持问题的信息 |
M3ter(计量软件) | 自定义 API 2 个 API ~每小时 500 Mb | 准确的使用信息和账单 |
Galaxy | ClickHouse Cloud 1 个表 | Galaxy 是我们控制平面/UI 层的基于事件的可观测性和监控系统 |
Segment | S3 存储桶 1 个表 | 一些额外的营销数据 |
Marketo | 自定义 | 发送的电子邮件元信息 |
AWS 公开价格 | 自定义 API 3 个表 | 每个区域中每个 AWS SKU 的价格 |
GCP 价格 | CSV 文件 | 每个区域中每个 GCP SKU 的价格 |
- 在我们目前的阶段,我们数据的小时粒度就足够了。这意味着,我们可以收集和存储每小时的聚合数据。
- 目前,我们不需要使用 CDC(或“变更数据捕获”)方法,因为它会使 DWH 基础设施更加昂贵。传统的直接加载/ETL 应该可以满足我们的需求。如果这些数据源需要更新,我们可以执行完整的数据重新加载。
- 由于我们拥有一个出色的可扩展且快速的数据库,因此我们无需在数据库外部执行 ETL 转换。相反,我们直接使用 ClickHouse 使用 SQL 执行转换。这效果很好。
- 在 ClickHouse,我们本质上是开源的,因此我们希望我们所有的技术栈都只包含开源组件。我们也喜欢贡献。
- 由于我们有非常不同类型的数据源,因此我们需要多种工具和方法来从这些源中提取数据。同时,我们需要标准化的中间存储。
然而,我们最初的其他假设之一被证明是不正确的。我们假设,由于我们的数据结构不是很复杂,因此我们只需要在 DWH 中设置两个逻辑层 - 原始层和“数据集市”层。这是一个错误。实际上,我们需要第三个中间层来存储内部业务实体。我们将在下面解释。
架构
最终,我们提出了以下架构
-
从高层次来看,我们的技术栈可以描述为
- ClickHouse Cloud 作为主数据库
- Airflow 作为调度器(一个开源调度工具)
- AWS S3 作为 RAW 数据的中间存储
- Superset 作为内部 BI 和 AD-HOC 工具
-
我们使用不同的工具和方法将数据从数据源捕获到多个 S3 存储桶
- 对于控制平面、数据平面、Segment 和 AWS CUR,我们使用数据源的本机功能来导出数据
- 对于 GCP 账单,我们使用 BigQuery 导出查询将数据导出到 GCS,然后可以通过 ClickHouse S3 表函数从中提取数据
- 对于 Salesforce,我们使用 AWS AppFlow
- 为了从 M3ter 捕获数据,我们编写了自己的应用程序。最初是用 Kotlin 编写的,后来我们将其迁移到 Python
- 对于 Galaxy(由 ClickHouse Cloud 集群表示),我们使用 ClickHouse S3 表函数将数据导出到 S3
- 对于 Marketo,我们使用 Fivetran
- 最后,由于 AWS 和 GCP 价格很少变化,我们决定不自动化其加载,而是创建了一些脚本来帮助我们在需要时手动更新 CSP 价格
-
对于大型事实表,我们收集每小时增量。对于字典和不仅可以接收新行而且可以接收更新的表,我们使用“替换”方法(即,我们每小时下载整个表)。
-
一旦每小时数据收集到 S3 存储桶中,我们就使用 ClickHouse s3 表函数将数据导入到 ClickHouse 数据库。S3 表函数在副本之间扩展,并且在处理大量数据时效果很好。
-
数据从 S3 存储桶插入到数据库中的 RAW 层。此层具有与源相同的表结构。
-
在 Airflow 运行的一系列转换(包括连接)之后,来自原始表的数据被插入到 MART 表中 - 这些表代表业务实体并满足我们内部利益相关者的需求。
在执行转换时,会使用许多临时表。实际上,大多数转换后的结果首先写入暂存表,然后才插入到目标表中。虽然这种方法引入了一些复杂性,但它也为我们提供了重用增量数据所需的灵活性。这允许单个增量部分多次使用,而无需重新计算或重新扫描目标表。暂存表对于每次 Airflow DAG (有向无环图)运行都具有唯一的名称。
-
最后,Superset BI 工具允许我们的内部用户查询 MART 表以及构建图表和仪表板
Superset 仪表板示例。注意:为了说明目的,此处展示了带有虚假数字的示例数据。
幂等性
我们在 ClickHouse 中使用的大多数表都使用 ReplicatedReplacingMergeTree 引擎。此引擎使我们不必担心表中的重复项 - 具有相同键的记录将被压缩,并且仅保留最后一条记录。这也意味着我们可以根据需要多次插入一个特定小时的数据 - 只有每个行的最后一个版本会保留下来。当表用于进一步转换以实现一致性时,我们还使用 ClickHouse 的功能“FINAL”,因此,例如,sum()
函数不会将一行计算两次。
当与 Airflow 作业/DAG 结合使用时,这些作业/DAG 可以容忍同一时段多次运行,我们的管道是完全幂等的,可以安全地重新执行而不会导致重复。有关内部 Airflow 设计的更多详细信息将在下面给出。
一致性
默认情况下,ClickHouse 提供最终一致性。这意味着如果您成功运行插入查询,则不能保证新数据在所有 ClickHouse 副本中。这对于实时分析来说是足够的,但对于 DWH 场景来说是不可接受的。想象一下,例如,您将数据插入暂存表。插入成功完成,您的 ELT 进程开始执行下一个从暂存表读取数据的查询……而您只获得部分数据。
但是,ClickHouse 为一致性比插入数据在第一个节点上的即时可用性更重要的用例提供了不同的模式。为了保证插入查询在所有副本都具有数据之前不会返回“成功”,我们使用设置 insert_quorum=3
运行所有插入查询(我们的集群中有三个节点)。我们不使用“auto”设置,因为当一个节点发生故障时(例如,当执行 ClickHouse 升级时),其余两个节点仍然可以接受插入。一旦重新启动的节点可用,则此节点中可能会在一段时间内缺少插入的数据。因此,对于我们来说,最好在少于三个副本中插入数据时收到错误(Number of alive replicas (2) is less than requested quorum (3/3).. (TOO_FEW_LIVE_REPLICAS
)。由于升级导致的重启非常快,因此当 Airflow 在错误后重试时,查询通常会成功。
当然,这种方法不能保证先前失败的插入中的未提交部分对查询不可见,但这不是问题,因为我们支持上一部分中描述的幂等性。另一种解决方案是仅使用一个副本运行所有 ELT 进程,但这可能会限制性能。
内部基础设施设计
鉴于我们的规模,我们需要我们的 DWH 基础设施简单、易于操作且易于扩展。在直接在 AWS EC2 上运行内部 PoC 后,我们将所有基础设施组件迁移到 Docker。
- 我们为 Airflow Web 服务器、Airflow Worker 和 Superset 设置了单独的机器。所有组件都打包在 Docker 容器中
- 在 Airflow 机器上,我们还每 5 秒运行一个容器,将包含我们的 DAG 代码、ELT 查询和一些配置文件的存储库与机器上的文件夹同步
- 我们使用 Superset 仪表板和警报功能,因此我们有 Superset 的调度器和 Worker 容器
- 所有 Airflow 和 Superset 组件都通过在单独机器上运行的 Redis 实例同步。Redis 存储作业运行的执行状态和 Airflow 的 Worker 代码、Superset 的缓存查询结果以及一些其他服务信息
- 我们使用 AWS RDS for PostgreSQL 作为 Airflow 和 Superset 的内部数据库
- 我们有两个独立运行的环境,它们在不同的区域中具有自己的 ClickHouse Cloud 实例、Airflow 和 Superset 安装
- 虽然一个环境名为 Preprod,另一个环境名为 Prod,但我们保持 Preprod 的一致性,以便在 Prod 不可用时能够切换
这样的设置使我们能够安全且轻松地进行发布
- 开发人员从开发或生产分支创建一个分支
- 开发人员进行更改
- 开发人员创建一个到 Preprod 分支的 PR
- 一旦 PR 经过审核和批准,更改将进入 Preprod Airflow 实例,并在其中进行测试
- 一旦更改准备好发布到生产环境,就会执行从 Preprod 到 Prod 分支的 PR
Airflow 内部设计
最初,我们考虑创建一个具有许多依赖项的复杂 DAG 系统。不幸的是,现有的 DAG 依赖关系机制选项都无法与所需的架构一起工作(这在 Airflow 中是一个非常常见的问题)
- Airflow 不允许数据集名称在执行之间更改。新引入的数据集因此无法使用临时名称。如果我们使用静态数据集名称,则下游 DAG 将仅为最后一个增量运行一次。
- 触发器可以为我们工作,但使用触发器会给我们的设置增加太多复杂性。从运营角度来看,拥有 10-20 个带有触发器的 DAG 看起来像是一场依赖关系噩梦。
因此,我们最终得到了以下结构
- 用于从数据源加载数据到 S3 的单独 DAG(例如,M3ter -> S3)
- 当数据交付到 S3 时,执行所有转换的单个大型主 DAG
这种方法的主要优点是,它既结合了在主 DAG 的任务中清楚列出所有必需的依赖项,又具有构建未连接到失败数据集的实体的能力。
安全性
由于我们的内部 DWH 系统存储敏感数据,包括 PII 和财务信息,因此安全性必须是我们架构的基础。为此,我们实施了一些基本规则和一套关于如何运营 DWH 的框架。
通用规则
- 不同的用户必须根据公司的角色模型访问不同的数据,并且应该自动完成
- 应该在数据库级别(而不是在 BI 端!)完成权限分离
- 网络访问限制应在所有级别上呈现(从使用 Okta 进行 BI 工具到 IP 过滤)
实施
我们使用 Google 组来控制内部用户权限。这使我们能够使用现有的公司内部组,并且还允许组所有者(可以由不关心 SQL 的非技术人员担任)控制对不同数据的访问。组可以是嵌套的。例如
为了将 Google 组与确切的权限匹配,我们使用一个系统表,该表连接了
- Google 组名称
- 数据库名称
- 表名
- 列数组
- 过滤器(例如,“where organization=’clickhouse’”)
- 访问类型 (SELECT, INSERT)
我们还有一个脚本执行以下操作
- 获取组和用户的递归列表
- 在数据库中使用唯一密码创建(实际上是替换)这些用户
- 创建与 Google 组对应的角色
- 将角色分配给用户
- 根据权限表授予角色权限,并带有“WITH REPLACE OPTION”子句 - 这将删除可能由于某种原因手动完成的所有其他授权
在 Superset 端,我们使用 DB_CONNECTION_MUTATOR 函数在将查询发送到数据库时将数据库用户名替换为 Superset 用户。我们在 Superset 中还启用了 Google Oauth。这意味着,在 DB_CONNECTION_MUTATOR 中,我们拥有使 Superset 使用所需的用户名和密码连接所需的一切
def DB_CONNECTION_MUTATOR(uri, params, username, security_manager, source):
# Only enable mutator on clickhouse cloud endpoints
if not uri.host.lower().endswith("clickhouse.cloud"):
return uri, params
user = security_manager.find_user(username=username)
generated_username = str(user.email).split('@')[0] + '--' + str(user.username)
uri.username = generated_username
# Password generation logic - hidden in this example
uri.password = ...
return uri, params
上述意味着 Superset 为每个用户使用唯一的数据库用户名,并具有一组由 Google 组控制的唯一权限。
GDPR 合规性
ClickHouse Cloud 用户可以要求我们删除他们的所有个人数据,包括姓名、电子邮件和其他信息。当然,在这种情况下,我们也会从 DWH 中删除此信息。这里最酷的事情是我们不需要在 ClickHouse 表中运行任何更新或删除。由于我们的引擎仅为每个键值保留最后一条记录,因此我们需要做的就是插入一个包含已删除用户数据的新版本行。旧行需要几个小时才能消失,但 GDPR 标准根据场景为您提供 3 到 30 天的数据删除时间。因此,完整算法是
- 在源系统之一中找到一个特殊标志,表明应屏蔽/删除此 ID
- 从表中选择具有此 ID 的所有记录
- 屏蔽必填字段
- 将数据插回表中
- 运行“optimize table … final”命令以确保旧记录已从磁盘中删除
- 当新的每小时增量到来时,我们执行与已删除 ID 列表的连接。这意味着,如果由于某种原因,用户的 PII 信息尚未完全删除,我们将自动屏蔽此数据
改进和未来计划
虽然总的来说我们对我们的 DWH 感到满意,但我们计划在未来几个月内更改一些内容
第三个逻辑层
不幸的是,仅设置两个逻辑层的想法行不通。我们发现,为了计算可以回填并且需要来自 5 个以上数据源的数据的真正复杂的指标,我们必须在不同的数据集市之间创建依赖关系。有时,这甚至涉及递归依赖关系。为了解决这个问题,我们需要引入一个名为详细数据存储 (DDS) 的中间层。它将存储一些内部业务实体,如帐户、组织、服务等。此层将不向最终用户开放,但它将帮助我们消除数据集市之间的依赖关系。
DBT
Airflow 是一个很好的调度器,但我们需要一个工具来处理许多其他事情:在需要时完全重新加载数据集市、QA、数据描述和文档等等。为此,我们计划将 Airflow 与 DBT 集成。由于我们在 Docker 容器中运行所有数据基础设施,因此很容易为我们的需求创建一个单独的 DBT 容器,该容器将由 Airflow DAG 触发。
命名约定
虽然从一开始我们就知道我们必须遵循一些关于如何命名表、字段和图表的规则,但我们没有在这方面投入太多资源。因此,我们现在有相当令人困惑的命名,这些命名不允许用户理解特定表或字段的用途。我们需要使其更清晰。
资源
ClickHouse 是一家相对年轻的公司,因此我们的 DWH 团队规模相对较小,只有 3 名成员
- 数据工程师 - 构建和维护基础设施
- 产品分析师 - 帮助用户获得见解、构建图表和理解数据
- 团队负责人 - 仅将约 30% 的时间用于 DWH 任务
至于基础设施,我们使用两个环境,每个环境都有单独的 ClickHouse Cloud 服务。每个服务都有 3 个节点(也称为副本,但所有副本都接受查询)。我们的 ClickHouse 服务的内存使用量约为 200 Gb。虽然我们不为这些服务付费,因为我们是 ClickHouse Cloud 团队的一员,但我们研究了竞争对手的定价和性能,并相信另一种云分析数据库在我们的案例中会贵得多。
此外,我们的基础设施还包括 8 台 EC2 机器和一个包含原始数据的 S3 存储桶。总的来说,这些服务的成本约为每月 1,500 美元。
总体结果
我们的 DWH 已运行一年多。我们有超过 70 个月活跃用户、数百个仪表板和数千张图表。总而言之,用户每天运行约 40,000 个查询。此图表显示了每天的查询数量,细分是按用户。系统和 ELT 用户已排除
是的,我们的用户在周末也工作
我们存储了约 115 Tb 的未压缩数据在约 150 个表中,但由于 ClickHouse 的高效压缩,实际存储的数据仅为约 13 Tb。
我们的 DWH 中数据量的每周增长量。2 月份的峰值代表一个内部实验,该实验需要复制所有数据。
总结
在一年内,我们部署了一个基于开源技术的 DWH,它提供了用户喜爱的体验。虽然我们的 DWH 使数据处理变得容易,但我们也看到了许多我们需要进行的改进和更改,以便向前发展。我们相信,我们对 ClickHouse Cloud 的使用证明它可以用于构建强大的 DWH。