DoubleCloud 即将停止运营。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

我们在 ClickHouse 中构建内部数据仓库的方式

author avatar
Dmitry Pavlov
2023年7月19日

在 ClickHouse,我们的使命是为客户和用户提供一个闪电般快速的云分析数据库,可用于内部和面向客户的分析。 ClickHouse Cloud 使我们的客户能够存储和处理几乎无限量的数据,这有助于他们做出数据驱动的决策。基于事实而非假设做出决策,对于当今大多数成功的企业至关重要。

当然,我们团队内部也遵循同样的方法。开发和运营我们的云数据库产品会生成大量数据,这些数据可用于容量规划、定价、更好地了解客户需求以及财务报告。数十个数据源、数百 TB 的数据以及大约一百名 BI 和临时用户……猜猜看 - 我们正在使用 ClickHouse Cloud 来处理这些数据 :)

在这篇文章中,我将分享我们的内部数据仓库 (DWH) 是如何构建的,我们使用的技术栈以及我们的 DWH 在未来几个月将如何发展。

需求和数据源

我们在 2022 年 5 月启动了 ClickHouse Cloud 的私有预览版,同时我们意识到我们希望更好地了解我们的客户:他们如何使用我们的服务,他们遇到了哪些困难,我们如何帮助他们,以及如何使我们的定价对他们来说负担得起且合理。为此,我们需要从多个内部数据源收集和处理数据:数据平面,负责运行客户的数据库 Pod;控制平面,负责面向客户的 UI 和数据库操作;以及 AWS 账单,它为我们提供了运行客户工作负载的确切成本。

有一段时间,我们的产品副总裁 Tanya Bragin 每天都会使用 Excel 手动分析客户的工作负载,并进行查找。作为一名前任 DWH 架构师,我为她不得不如此挣扎而感到羞愧,因此,第一个内部 DWH 概念由此诞生。

在设计系统时,我们有许多关键任务旨在支持我们的内部利益相关者,其中一些列在下面。

内部团队任务
产品团队跟踪转化率和留存率、功能使用情况、服务大小、使用情况并找出最常见的问题。进行深入的临时分析。
运营团队跟踪近似收入,并向公司大部分员工提供 Salesforce 数据的只读访问权限
销售团队查看特定客户的设置和使用情况:多少服务、多少数据、常见问题等。
工程团队调整我们的自动扩展器,跟踪查询错误率和数据库功能的使用情况
支持团队查看特定客户的设置:服务、使用情况、数据量等
市场营销团队跟踪漏斗顶端转化率、客户获取成本和其他营销指标
成本节约团队分析我们的 CSP 成本并主动优化我们的 CSP 承诺
CI-CD 团队跟踪 CI-CD 成本

注意:在我们的内部数据仓库中,我们不会收集、存储或处理任何客户数据(其中大部分数据已加密),例如表数据、查询文本、网络数据等。例如,对于查询分析,我们仅收集使用的函数列表、查询运行时间、使用的内存以及其他一些元信息。我们从不收集查询数据或查询文本。

为了实现这一点,我们制定了一个计划,从数十个来源摄取数据,包括以下内容。

数据源类型和大小数据
控制平面DocumentDB 约 5 个集合 每小时约 500 MB数据库服务元信息:类型、大小、CSP 区域、状态、财务计划、扩展设置等。
数据平面ClickHouse Cloud 约 5 个表 每小时约 15 GB数据库系统信息:指标统计、查询统计、表统计、Pod 分配等。
AWS CURS3 存储桶 1 个表 每小时约 1 GB我们运行服务的 AWS 基础设施的成本和使用情况
GCP 账单BigQuery 1 个表 每小时约 500 MB我们运行服务的 GCP 基础设施的成本和使用情况
Salesforce (CRM)自定义 约 30 个表 每小时约 1 GB有关客户账户、使用计划、订阅、折扣、区域、潜在客户和支持问题的的信息
M3ter (计量软件)自定义 API 2 个 API 每小时约 500 MB准确的使用信息和账单
GalaxyClickHouse Cloud 1 个表Galaxy 是我们用于控制平面/UI 层的基于事件的可观察性和监控系统
SegmentS3 存储桶 1 个表一些额外的营销数据
Marketo自定义发送的电子邮件元信息
AWS 公共价格自定义 API 3 个表每个 AWS SKU 在每个区域的价格
GCP 价格CSV 文件每个 GCP SKU 在每个区域的价格

鉴于我们的主要目标,我们做了一些假设
  • 在我们目前的阶段,数据的小时粒度足够了。这意味着,我们可以为每个小时收集和存储聚合数据。
  • 目前,我们不需要使用 CDC(或“更改数据捕获”)方法,因为它会使 DWH 基础设施变得更加昂贵。传统的直接加载/ETL 应该能够满足我们的需求。如果这些数据源需要更新,我们可以执行完整的数据重新加载。
  • 由于我们拥有一个非常可扩展且快速的数据库,因此我们不需要在数据库外部执行 ETL 转换。相反,我们直接使用 ClickHouse 使用 SQL 执行转换。这非常有效。
  • 在 ClickHouse,我们本质上是开源的,因此我们希望我们的所有技术栈都只有开源组件。我们也喜欢贡献。
  • 由于我们有非常不同类型的数据源,因此我们将需要多种工具和方法来从这些数据源提取数据。同时,我们需要一个标准化的中间存储。

然而,我们最初的其他假设之一被证明是错误的。我们假设,因为我们的数据结构并不复杂,因此我们只需要在 DWH 中拥有两个逻辑层 - 原始层和“数据市场”层就足够了。这是一个错误。实际上,我们需要第三个中间层来存储内部业务实体。我们将在下面解释。

架构

因此,我们提出了以下架构

dwh_architecture_v2.png

  1. 从高级别来看,我们的技术栈可以描述为

    • ClickHouse Cloud 作为主要数据库
    • Airflow 作为调度器(一个开源调度工具)
    • AWS S3 作为原始数据的中间存储
    • Superset 作为内部BI和临时查询工具
  2. 我们使用不同的工具和方法将数据从数据源捕获到多个S3存储桶中

    • 对于控制平面、数据平面、Segment和AWS CUR,我们使用数据源的原生功能导出数据
    • 对于GCP计费,我们使用BigQuery 导出查询 将数据导出到GCS,然后可以由ClickHouse S3表函数 导入
    • 对于Salesforce,我们使用 AWS AppFlow
    • 为了捕获来自M3ter的数据,我们编写了自己的应用程序。最初它是用Kotlin编写的,后来我们迁移到了Python
    • 对于Galaxy(由ClickHouse Cloud集群表示),我们使用ClickHouse S3表函数 将数据导出到S3
    • 对于Marketo,我们使用 Fivetran
    • 最后,由于AWS和GCP的价格变化非常少,我们决定不自动加载,而是创建了一些脚本,以便在需要时帮助我们手动更新CSP价格
  3. 对于大型事实表,我们收集每小时增量。对于不仅可以接收新行,还可以接收更新的字典和表,我们使用“替换”方法(即我们每小时下载整个表)。

  4. 一旦每小时数据收集到S3存储桶中,我们就使用ClickHouse s3表函数 将数据导入到ClickHouse数据库中。S3表函数可以跨副本扩展,并且在大量数据上运行良好。

  5. 从S3存储桶中,数据被插入到数据库中的RAW层。此层具有与源相同的表结构。

  6. 在一系列由Airflow运行的转换(包括联接)之后,来自原始表的数据被插入到MART表中 - 这些表表示业务实体并满足我们内部利益相关者的需求。

    在执行转换时,会使用许多临时表。实际上,大多数转换结果首先写入暂存表,然后才插入目标表。虽然这种方法引入了一些复杂性,但它也为我们提供了重用增量数据的必要灵活性。这允许单个增量部分被多次使用,而无需重新计算或重新扫描目标表。暂存表对于每个Airflow DAG (有向无环图)运行都有唯一的名称。

  7. 最后,Superset BI工具允许我们的内部用户查询MART表以及构建图表和仪表板

superset_dwh.png Superset仪表板示例。注意:出于说明目的,显示了包含虚假数字的示例数据。

幂等性

我们大多数在ClickHouse中使用的表都使用ReplicatedReplacingMergeTree 引擎。此引擎允许我们不必关心表中的重复项 - 具有相同键的记录将被压缩,并且仅保留最后一条记录。这也意味着我们可以根据需要多次插入特定小时的数据 - 每一行的最后一个版本将被保留。我们还在表用于进一步转换时使用ClickHouse的“FINAL”功能来实现一致性,例如,sum()函数不会重复计算一行。

与Airflow作业/DAG结合使用,这些作业/DAG能够容忍对同一时段的多次运行,我们的管道完全幂等,可以安全地重新执行,而不会导致重复。有关Airflow内部设计的更多详细信息将在下面给出。

一致性

默认情况下,ClickHouse提供最终一致性。这意味着,如果您成功运行了一个插入查询,它并不保证新数据存在于所有ClickHouse副本中。这对于实时分析来说已经足够了,但对于DWH场景来说是不可接受的。例如,假设您将数据插入到暂存表中。插入成功完成,并且您的ELT流程开始执行从暂存表读取的下一个查询……您只获得了部分数据。

但是,ClickHouse为一致性比第一个节点上插入数据的即时可用性更重要的用例提供了不同的模式。为了保证插入查询在所有副本都拥有数据之前不返回“成功”,我们使用设置insert_quorum=3运行所有插入查询(我们的集群中有三个节点)。我们不使用“auto”设置,因为当一个节点宕机时(例如,执行ClickHouse升级时),其余两个节点仍然可以接受插入。一旦重新启动的节点可用,一段时间内该节点可能缺少插入的数据。因此,对于我们来说,在少于三个副本中插入数据时,最好收到错误(Number of alive replicas (2) is less than requested quorum (3/3).. (TOO_FEW_LIVE_REPLICAS)。由于由于升级导致的重启非常快,因此在发生错误后Airflow重试时,查询通常会成功。

当然,这种方法不能保证以前失败的插入的未提交部分不会对查询可见,但这并不是问题,因为我们支持如前一部分所述的幂等性。另一种解决方案是仅使用一个副本运行所有ELT流程,但这可能会限制性能。

内部基础设施设计

鉴于我们的规模,我们需要我们的DWH基础设施简单、易于操作和易于扩展。在AWS EC2上直接运行内部PoC之后,我们将所有基础设施组件迁移到Docker。

internal_infra_design_v2.png

  • 我们为Airflow Web服务器、Airflow工作器和Superset提供了单独的机器。所有组件都打包在Docker容器中
  • 在Airflow机器上,我们还会每5秒运行一个容器,该容器将包含我们的DAG代码、ELT查询和一些配置文件的存储库与机器上的文件夹同步
  • 我们使用Superset仪表板和警报功能,因此我们为Superset提供了调度器和工作器容器
  • 所有Airflow和Superset组件都通过运行在单独机器上的Redis实例进行同步。Redis存储作业运行的执行状态和Airflow的工作器代码、Superset的缓存查询结果以及其他一些服务信息
  • 我们使用AWS RDS for PostgreSQL作为Airflow和Superset的内部数据库
  • 我们有两个独立运行的环境,它们在不同的区域拥有自己的ClickHouse Cloud实例、Airflow和Superset安装
  • 虽然一个环境名为Preprod,另一个名为Prod,但我们保持Preprod一致,以便在Prod不可用时能够切换

这样的设置使我们能够安全且轻松地进行发布

  1. 开发人员从开发或生产分支创建分支
  2. 开发人员进行更改
  3. 开发人员向Preprod分支创建PR
  4. PR经过审查和批准后,更改将转到Preprod Airflow实例,并在其中进行测试
  5. 一旦更改准备发布到生产环境,就会从Preprod到Prod分支执行PR

Airflow内部设计

最初,我们考虑创建一个具有许多依赖项的复杂DAG系统。不幸的是,现有的DAG依赖机制选项都不能与所需的架构一起使用(这在Airflow中是一个非常常见的问题)

  • Airflow不允许数据集名称在执行之间发生变化。新引入的数据集 因此不能使用临时名称。如果我们使用静态数据集名称,则下游DAG将仅为最后一个增量运行一次。
  • 触发器可以为我们工作,但使用触发器会增加我们设置的复杂性。从操作角度来看,拥有10-20个带有触发器的DAG看起来像是一个依赖项噩梦。

因此,我们最终采用了以下结构

  • 将数据从数据源加载到S3的单独DAG(例如,M3ter -> S3)
  • 一个庞大的主DAG,在数据交付到S3时执行所有转换

dwh_dag_v2.png

这种方法的主要优点是它结合了在主DAG的任务中清晰列出所有必需依赖项以及构建与失败数据集无关的实体的能力。

安全性

由于我们的内部DWH系统存储敏感数据,包括PII和财务信息,因此安全性必须是我们架构的基础。为此,我们实施了一些基本规则和一套关于我们如何操作DWH的框架。

一般规则

  • 根据公司的角色模型,不同的数据必须可供不同的用户访问,并且应该自动完成
  • 权限分离应在数据库级别完成(而不是在BI端!)
  • 网络访问限制应在所有级别呈现(从使用Okta进行BI工具到IP过滤)

实施

我们使用Google组来控制内部用户权限。这使我们能够使用现有的内部公司组,并且还允许组所有者(可以由不关心SQL的非技术人员表示)控制对不同数据的访问。组可以嵌套。例如

为了将Google组与精确权限匹配,我们使用一个系统表来连接

  • Google组名称
  • 数据库名称
  • 表名称
  • 列数组
  • 过滤器(例如,“where organization=’clickhouse’”)
  • 访问类型(SELECT,INSERT)

我们还有一个执行以下操作的脚本

  1. 获取组和用户的递归列表
  2. 在数据库中使用唯一的密码创建(实际上是替换)这些用户
  3. 创建对应于Google组的角色
  4. 将角色分配给用户
  5. 根据带有“WITH REPLACE OPTION”子句的权限表向角色授予权限 - 这将删除可能由于某种原因手动完成的所有其他授权

在Superset端,我们使用DB_CONNECTION_MUTATOR 函数在将查询发送到数据库时用Superset用户替换数据库用户名。我们还在Superset中启用了Google OAuth。这意味着,在DB_CONNECTION_MUTATOR中,我们拥有使Superset使用所需的用户名和密码连接所需的一切

def DB_CONNECTION_MUTATOR(uri, params, username, security_manager, source):
    # Only enable mutator on clickhouse cloud endpoints
    if not uri.host.lower().endswith("clickhouse.cloud"):
        return uri, params
    user = security_manager.find_user(username=username)
    
    generated_username = str(user.email).split('@')[0] + '--' + str(user.username)
    uri.username = generated_username
    # Password generation logic - hidden in this example
    uri.password = ...
    return uri, params

以上意味着Superset为每个用户使用唯一的数据库用户名,并具有由Google组控制的唯一权限集。

GDPR合规性

ClickHouse Cloud用户可以要求我们删除其所有个人数据,包括姓名、电子邮件和其他信息。当然,在这种情况下,我们也会从DWH中删除此信息。这里最棒的事情是我们不需要在ClickHouse表中运行任何更新或删除操作。因为我们的引擎只为每个键值保留最后一个记录,所以我们只需要插入包含已删除用户数据的新版本的行即可。旧行需要几个小时才能消失,但根据场景,GDPR标准为您提供3到30天的时间来删除数据。因此,完整的算法是

  1. 在源系统之一中找到一个特殊的标志,表明此ID应该被屏蔽/删除
  2. 选择表中与此ID相关的全部记录
  3. 屏蔽所需字段
  4. 将数据重新插入表中
  5. 运行“optimize table … final”命令以确保旧记录从磁盘中删除

  6. 当新的每小时增量到来时,我们会执行与已删除 ID 列表的联接操作。这意味着,如果由于某种原因用户的 PII 信息尚未完全删除,我们将自动屏蔽这些数据。

改进和未来计划

虽然总体而言我们对我们的 DWH 非常满意,但我们计划在未来几个月内对其进行一些更改。

第三个逻辑层

不幸的是,只有两个逻辑层的思路行不通。我们发现,对于计算需要回填且需要来自 5 个以上数据源的数据的非常复杂的指标,我们必须在不同的数据仓库之间创建依赖关系。有时这甚至涉及递归依赖关系。为了解决这个问题,我们需要引入一个中间层,称为详细数据存储 (DDS)。它将存储一些内部业务实体,如账户、组织、服务等。此层对最终用户不可用,但它将帮助我们消除数据仓库之间的依赖关系。

DBT

Airflow 是一款优秀的调度器,但我们需要一个可以处理许多其他事物的工具:如果需要,可以完全重新加载数据仓库、QA、数据描述和文档等。为此,我们计划将 Airflow 与 DBT 集成。由于我们在 Docker 容器中运行所有数据基础设施,因此非常容易为我们的需求创建一个单独的 DBT 容器,该容器将由 Airflow DAG 触发。

命名约定

虽然从一开始我们就清楚地知道,在命名表、字段和图表时必须遵循某些规则,但我们并没有投入太多资源。结果,我们现在有了相当混乱的命名方式,这使得用户无法理解特定表或字段的目的。我们需要使其更清晰。

资源

ClickHouse 是一家相对年轻的公司,因此我们的 DWH 团队规模相对较小,只有 3 名成员。

  • 数据工程师 - 构建和维护基础设施
  • 产品分析师 - 帮助用户获取见解、构建图表并理解数据
  • 团队负责人 - 仅花费约 30% 的时间在 DWH 任务上。

至于基础设施,我们使用两个具有独立 ClickHouse Cloud 服务的环境。每个服务都有 3 个节点(也称为副本,但所有副本都接受查询)。我们 ClickHouse 服务的内存使用量约为 200 GB。虽然我们作为 ClickHouse Cloud 团队成员无需为这些服务付费,但我们调查了竞争对手的定价和性能,并认为在我们的案例中,其他云分析数据库的成本会高得多。

此外,我们的基础设施包括 8 台 EC2 服务器和一个包含原始数据的 S3 存储桶。总而言之,这些服务的每月成本约为 1500 美元。

总体结果

我们的 DWH 已经运行了一年多。我们拥有超过 70 位月活跃用户、数百个仪表板和数千个图表。总而言之,用户每天运行约 40,000 个查询。此图表显示了每天的查询数量,并按用户进行了细分。系统和 ELT 用户已排除在外。

dwh_usage.png

是的,我们的用户也在周末工作。

我们在约 150 个表中存储了约 115 TB 的未压缩数据,但由于 ClickHouse 的高效压缩,实际存储的数据仅为约 13 TB。

dwh_data_size.png

DWH 中数据量的周环比增长。2 月份的激增代表了一个内部实验,需要复制所有数据。

总结

一年来,我们已部署了一个基于开源技术的 DWH,它提供了用户喜爱的体验。虽然我们的 DWH 使用户易于处理数据,但我们也看到了许多需要改进和更改的地方,以便我们能够继续前进。我们相信,我们对 ClickHouse Cloud 的使用证明了它可以用于构建强大的 DWH。

立即开始使用 ClickHouse Cloud并获得 300 美元的积分。在 30 天试用期结束时,您可以继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。访问我们的定价页面以获取详细信息。

分享此帖子

订阅我们的时事通讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单…
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image