博客 / 工程

使用 ClickHouse 构建 Bluesky 数据的 Medallion 架构

author avatar
PME 团队
2025 年 1 月 8 日 - 36 分钟阅读

与数据社区的其他成员一样,我们对 Bluesky 社交网络及其 API 最近的人气激增感到兴奋,通过该 API,您可以访问正在发布的大量内容。

此数据集包含每秒数千个 JSON 事件的高容量流,我们认为将数据提供给社区查询会很有趣。

只对查询 Bluesky 感兴趣?!?
对于只想快速将 Bluesky 数据摄取到 ClickHouse 中的用户,我们建议跳转到此处。然后可以在此处找到示例分析查询和使用此数据的说明。

在探索数据的过程中,我们意识到许多事件都存在格式错误或不正确的时间戳。该数据集还包含频繁的重复项。因此,我们不能只是导入数据就万事大吉 - 我们需要进行一些清理。

这是一个尝试我们在最近的博客中讨论的 Medallion 架构的绝佳机会。这篇文章将通过一个实际示例将这些概念变为现实。

我们将构建一个工作流程来应对这些挑战,将此数据集组织成三个不同的层级:Bronze、Silver 和 Gold。我们将遵守 Medallion 架构的原则,并大量使用最近发布的 JSON 类型。每个层级都将在我们的演示环境 sql.clickhouse.com 中公开查询,读者可以在其中亲身体验和互动结果。我们甚至提供了一些示例分析查询,以帮助您入门

Markdown Image

什么是 Bluesky?

对于那些不太活跃于社交媒体的您来说,您可能错过了Bluesky最近的崛起,它目前每天积累近 100 万用户。Bluesky 是一种类似于 X 的社交网络,但与它们不同的是,它是完全开源和去中心化的!

Bluesky 构建于AT 协议 (ATProto)之上,是一个去中心化的社交媒体平台,允许用户独立托管其内容。默认情况下,数据驻留在 Bluesky 个人数据服务器 (PDS) 上,但用户可以选择自行托管这些服务器(及其内容)。这种方法反映了向早期 Web 原则的转变,在早期 Web 中,个人可以控制其内容和连接,而不是依赖于主导和拥有用户数据的中心化平台。

每个用户的数据都在轻量级开源框架中进行管理,单个 SQLite 数据库处理存储。这种设置实现了互操作性,并确保内容所有权仍然属于个人,即使中央平台离线或更改其策略。我们向对底层架构及其演变感兴趣的用户推荐这篇文章。

对我们来说最重要的是,像早期的 Twitter 一样,Bluesky 提供了一种免费的方式来检索事件,例如帖子,实时检索,随着网络人气的增长,这可能会解锁一个巨大的数据集用于分析。

读取 Bluesky 数据

为了摄取 Bluesky 数据,我们使用了最近发布的Jetstream API,它通过提供 JSON 编码的流来简化 Bluesky 事件的消费。与原始的 Firehose(需要处理二进制 CBOR 数据和 CAR 文件)不同,Jetstream 降低了复杂性,使其可供开发实时应用程序的开发人员访问。此 API 与我们的用例完美契合,允许我们过滤和处理来自 Bluesky 帖子的每秒数千个事件,同时解决常见挑战,例如格式错误的数据和高重复率。

对于我们的实现,我们连接到公共 Jetstream 实例,消费 JSON 编码事件的连续流以进行摄取。为此,我们使用一个简单的 bash 脚本,该脚本处理来自 Jetstream 的 JSON 事件实时流。完整的脚本可以在此处找到。总而言之,这

  1. 检查 GCS 存储桶中最新的 .csv.gz 文件,提取其时间戳(用作游标),并使用它从正确的位置恢复 Jetstream 订阅。这确保了数据连续性并最大限度地减少了重复。
  2. websocat 工具用于连接到 Jetstream API,订阅事件,并管道传输 JSON 流以进行处理。wantedCollections 参数过滤相关事件,cursor 确保增量数据检索,即 websocat -Un --max-messages-rev $MAX_MESSAGES "$WS_URL/subscribe?wantedCollections=app.*&cursor=$cursor" > "$OUTPUT_FILE"
  3. 传入的 JSON 数据被分成 50 万行的块,每个块代表一个文件,最后一个时间戳用作文件标识符。我们使用 clickhouse-local 将文件转换为 CSV 并压缩为 gz 文件,然后使用 gsutil 将文件上传到 GCS 存储桶。

此脚本在 ClickHouse Docker 容器中运行,每 3 分钟使用一次 Google Cloud Run Job 执行。

Markdown Image

请注意,文件自然会按照其名称使用最后一个事件的时间戳进行排序。这对于以后高效地增量读取 GCS 存储桶至关重要。此脚本也不能保证捕获所有 Bluesky 事件。

数据抽样

在撰写本文时,在收集了大约 21 天的数据后,我们捕获了近 15 亿行事件。我们可以使用 gcs ClickHouse 函数就地查询数据,并识别原始行总数。

1clickhouse-cloud :) SELECT count()
2FROM gcs('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames')
3
4┌────count()─┐
51484500000-- 1.48 billion
6└────────────┘
7
81 row in set. Elapsed: 72.396 sec. Processed 1.48 billion rows, 205.07 GB (20.51 million rows/s., 2.83 GB/s.)
9Peak memory usage: 4.85 GiB.

我们可以使用相同的函数对数据进行抽样,将每一行转换为 JSON 类型,并使用 PrettyJSONEachRow 格式以获得可读的结果。

1SET allow_experimental_json_type = 1
2
3SELECT data::'JSON' AS event
4FROM gcs('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames')
5LIMIT 1
6FORMAT PrettyJSONEachRow
7
8{
9  "account": {
10    "active": true,
11    "did": "did:plc:kjealuouxn3l6v4byxh2fhff",
12    "seq": "706717212",
13    "time": "2024-11-27T18:00:02.429Z"
14  },
15  "did": "did:plc:kjealuouxn3l6v4byxh2fhff",
16  "kind": "account",
17  "time_us": "1732730402720719"
18}
19
201 row in set. Elapsed: 0.233 sec.

虽然以上内容提供了一些对事件结构的见解,但它并未完全捕捉数据的复杂性、可变性和不一致性。kind 列很大程度上决定了后续结构,API 传递 commit、 identityaccount 事件类型。这些事件类型的完整详细信息可以在此处找到,但总而言之,这些代表

  • commit:commit 事件表示记录的创建、更新或删除。这应代表大多数事件,包括帖子、点赞和关注。
  • identity:帐户的身份已更新。
  • account:帐户的状态已更新。

一旦数据加载到 Bronze 层,我们将进一步探索这些数据。

Bluesky 数据的挑战

JetStream API 提供的 Bluesky 数据存在许多挑战,包括

  • 格式错误的 JSON - 我们偶尔会看到格式错误的 JSON 事件。虽然这些情况很少见,但它们会中断文件的处理。我们使用函数 isValidJSON 排除这些事件,将 Bronze 层的摄取限制为返回 1 的那些行。
  • 结构不一致 - 虽然每个事件的抓取时间戳是一致的(time_us 字段),但包含事件发生时间的 JSON 路径取决于事件类型。我们的工作流程需要根据这些条件提取一致的时间戳列。一个简单的分析表明
    • commit.record.createdAt 可以用于 commit 事件
    • identity.time 用于 identity 事件
    • account.time 用于 account 事件
  • 未来或无效时间戳 - 某些事件具有未来时间戳。例如,在撰写本文时,在对事件进行抽样时,4.2 万个 commit 事件具有未来时间。还有另外 400 万个 commit 事件的时间戳早于 Bluesky 作为一项服务推出的时间。
  • 重复结构 - 在某些情况下,JSON 包含看似深度递归结构的内容。这产生了超过1800 个唯一的 JSON 路径,其中大多数可能对内容几乎没有增加价值。
  • 重复项 - 尽管我们努力维护游标,但 Jetstream API产生重复项(其中内容相同,抓取时间戳除外)。令人惊讶的是,这些重复项可能会在很宽的时间范围内发生 - 在某些情况下,最多相隔 24 小时。在探索数据时,一个重要的观察结果是,大多数重复项发生在20 分钟的时间窗口内

以上并不代表数据质量问题的详尽列表 - 我们仍在不断发现数据的挑战!但是,为了举例说明目的并出于简洁考虑,我们在示例 Medallion 工作流程中重点关注以上内容。

JSON 数据类型

JSON 在实现 Bluesky 数据的 Medallion 架构中起着关键作用,使系统能够在 Bronze 层中存储高度动态和半结构化的特性。ClickHouse 版本 24.8 中引入的新的 JSON 数据类型解决了早期实现面临的关键挑战。

与为每个 JSON 路径推断单一类型(通常导致类型强制或强制转换)的传统方法不同,ClickHouse 的 JSON 类型将每个唯一路径和类型的值存储在单独的子列中。这确保了高效的存储,最大限度地减少了不必要的 I/O,并避免了查询时类型转换的陷阱。

Markdown Image

例如,当插入具有不同类型的两个 JSON 路径时,ClickHouse 会将每个具体类型的值存储在不同的子列中。可以独立访问这些子列,从而最大限度地减少不必要的 I/O。请注意,当查询具有多种类型的列时,其值仍然作为单个列响应返回。

此外,通过利用偏移量,ClickHouse 确保这些子列保持密集,对于不存在的 JSON 路径不存储默认值。这种方法最大限度地提高了压缩率,并进一步减少了 I/O。

Markdown Image

此外,该类型不会因唯一 JSON 路径数量较多而导致子列爆炸问题。这在 Bluesky 数据中尤为重要,如果不应用任何过滤,Bluesky 数据将具有超过 1800 个唯一路径。请注意,这不会阻止存储这些路径;相反,如果超出限制,它只是将新路径存储在单个共享数据列中(带有用于加速查询的统计信息)。

Markdown Image

这种优化的 JSON 处理确保了复杂、半结构化的数据集(如 Bluesky 的数据集)可以有效地存储在架构的 Bronze 层中。对于对这种新列类型的实现感到好奇的用户,我们建议阅读我们的此处的详细博客文章

Bronze 层用于原始数据

虽然 Bronze 层的原始描述不提倡任何过滤或转换,但我们在这里不那么教条,并且认为最小的过滤和数据转换(非破坏性的)在问题调查和允许将来重放数据方面可能很有用。对于转换,我们建议将这些限制为使用 物化列 可以实现的转换,如下面的 Bronze 层架构所示

1CREATE TABLE bluesky.bluesky_raw
2(
3	`data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`),
4	`_file` LowCardinality(String),
5	`kind` LowCardinality(String) MATERIALIZED getSubcolumn(data, 'kind'),
6	`scrape_ts` DateTime64(6) MATERIALIZED fromUnixTimestamp64Micro(CAST(getSubcolumn(data, 'time_us'), 'UInt64')),
7	`bluesky_ts` DateTime64(6) MATERIALIZED multiIf(getSubcolumn(data, 'kind') = 'commit', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'commit.record.createdAt'), 'String')), getSubcolumn(data, 'kind') = 'identity', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'identity.time'), 'String')), getSubcolumn(data, 'kind') = 'account', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'account.time'), 'String')), toDateTime64(0, 6)),
8	`dedup_hash` String MATERIALIZED cityHash64(arrayFilter(p -> ((p.1) != 'time_us'), JSONExtractKeysAndValues(CAST(data, 'String'), 'String')))
9)
10ENGINE = ReplacingMergeTree
11PRIMARY KEY (kind, bluesky_ts)
12ORDER BY (kind, bluesky_ts, dedup_hash)

关于此架构的一些重要说明

  • JSON 类型 - data 列使用新的 JSON 类型,并包含整个事件。我们使用 SKIP 子句来排除 JSON 中的特定路径,分析表明,这正是导致之前提到的结构重复的原因。
  • 元数据保留 - _file 列将包含对行来源文件的引用。
  • 物化列 - 我们的其余列是物化的,并在插入时从数据列计算得出。scrape_ts 包含事件传递的时间,并从 JSON 字段 time_us 中提取。我们的 kind 列描述了事件类型,如前所述。bluesky_ts 列执行条件逻辑,根据 kind 提取事件时间戳 - 这处理了我们的结构不一致性,并确保所有事件都具有一致的时间戳。最后,我们在列 dedup_hash 中计算事件的哈希值。为此,我们生成所有 JSON 路径及其值的数组,排除 time_us(它在重复事件中有所不同),使用函数 JSONExtractKeysAndValues。cityHash64 函数使用此数组,生成唯一的哈希值。
  • ReplacingMergeTree - ReplacingMergeTree 引擎在此处用于消除共享相同排序键值 (ORDER BY) 的重复条目,重复数据删除在后台合并期间异步发生。这些合并发生在不确定的时间,并且无法直接控制 - 因此,重复数据删除只是最终的。在我们的架构中,ORDER BY 键包括 kindbluesky_ts,从而实现高效读取,并通过聚类具有相似属性的行来确保强压缩。我们将 dedup_hash 附加到唯一标识用于重复数据删除的行,而无需将其包含在 PRIMARY KEY 中。后一种配置是一种优化,可防止将 dedup_hash 的索引加载到内存中 - 这是一个明智的选择,因为我们不直接查询哈希值。有关 ReplacingMergeTree 的详细指南,请参见此处

我们的 Bronze 层通过物化列执行最小的数据转换,同时提供数据重复数据删除功能。重要的是,此处 ReplacingMergeTree 的选择是可选的,并且与未来的层级脱钩。用户可能更喜欢标准的 MergeTree 来检查重复项。我们在此处的选择主要受最大限度地减少存储开销的愿望驱动。

从对象存储摄取数据

Markdown Image

如上所述,我们的摄取管道使用 websocat 工具从 JetStream API 流式传输数据,将事件存储为 Google Cloud Storage (GCS) 中的 .csv.gz 文件。此中间步骤提供了一些好处:它支持数据重放,保留原始原始数据的副本,并镜像许多用户如何从对象存储摄取数据。

要将这些文件从 GCS 读取到我们的 Bronze bluesky_raw 表中,我们使用 S3Queue 表引擎。此引擎从 S3 兼容的对象存储中读取数据,自动处理添加到存储桶的新文件,并通过物化视图将其插入到指定的表中。创建此表需要一些 DDL

1CREATE TABLE bluesky.bluesky_queue
2(
3	`data` Nullable(String)
4)
5ENGINE = S3Queue('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames')
6SETTINGS mode = 'ordered', s3queue_buckets = 30, s3queue_processing_threads_num = 10;

请注意我们如何指定包含 gzipped 文件的 GCS 存储桶,以及如何通过架构声明将每一行定义为 String。重要的是,我们通过设置 mode = 'ordered 启用“有序模式”。这强制文件按字典顺序处理,确保顺序摄取。虽然这意味着忽略以较早排序顺序添加的文件,但它保持了高效和增量处理,并避免了在文件没有自然排序时需要执行大型集合差异。

我们之前对文件使用时间戳可确保我们的数据按顺序处理,并且 S3Queue 表引擎可以快速识别新文件。

我们正在将此数据加载到的sql.clickhouse.com环境有三个节点,每个节点有 60 个 vCPU。s3queue_processing_threads_num 设置分配每个服务器用于文件处理的线程数。此外,有序模式还引入了设置 s3queue_buckets正如建议的那样,我们将其设置为等于副本数 (3) 乘以处理线程数 (10) 的乘积。

要从该队列中使用行,我们需要附加一个增量物化视图。此视图从队列中读取,对行执行 SELECT 语句,结果发送到我们的 Bronze 层表 bluesky_raw

1CREATE MATERIALIZED VIEW bluesky.bluesky_mv TO bluesky.bluesky_raw
2(
3	`data` Nullable(String)
4)
5AS SELECT
6	data,
7	_file
8FROM bluesky.bluesky_queue
9WHERE isValidJSON(data) = 1

请注意,我们在此层执行基本过滤,通过使用 sValidJSON(data) = 1 进行过滤,将发送到我们的 Bronze 表的行限制为有效的 JSON,并包括元数据列 _file 以确保我们记录了每一行来源的 gzip 文件。

将 Bluesky 直接流式传输到 ClickHouse

请注意,ClickHouse 可以通过 JSON 输入格式直接流式传输数据,正如我们的 CTO Alexey Milovidov 最近演示的那样。这可以通过结合 JSON 数据类型和 JSON 输入格式来实现。例如,

1websocat -n "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=app.*" | pv -l | split -l 1000 --filter='clickhouse-client --host sql-clickhouse.clickhouse.com --secure --password "" --query "INSERT INTO bluesky.bluesky_raw (data) FORMAT JSONAsObject"'

ClickHouse Cloud 的 ClickPipes

虽然 S3Queue 表引擎允许我们将数据从对象存储流式传输到 ClickHouse,但它确实存在局限性。除了仅限于 S3 兼容的存储之外,它还仅提供至少一次语义。ClickHouse Cloud 的用户可能更喜欢 ClickPipes - 一种托管数据摄取产品,它提供恰好一次语义,支持更多来源(例如 Kafka),并将摄取资源与集群分离。这可以用于在上述架构中替换 S3Queue,只需通过引导式向导进行最少的设置。

查询 Bronze 层

虽然我们不建议将您的 Bronze 表暴露给下游消费者,但我们对排序键的选择确实允许我们执行高效的数据探索,并识别任何进一步的质量问题,或者在需要时通过后续层级重放数据。

我们注意到在合并时,ReplacingMergeTree 使用 ORDER BY 列的值作为唯一标识符来识别重复行,并且仅保留最高版本。然而,这仅提供最终正确性 - 它不保证行将被重复数据删除,您不应依赖它。为了确保答案正确,用户需要使用查询时重复数据删除和删除删除来补充后台合并。这可以使用 FINAL 运算符来实现。这将产生资源开销,并将对查询性能产生负面影响 - 这是我们不建议将 Bronze 表暴露给消费者的另一个原因。

我们在上面的查询中省略了 FINAL 运算符,接受数据探索练习中少量的重复项。commit 事件代表了大部分数据

1SELECT kind, formatReadableQuantity(count()) AS c
2FROM bluesky_raw
3GROUP BY kind
4FORMAT PrettyCompactMonoBlock
5┌─kind─────┬─c──────────────┐
6commit614.55 million │
7│ account  │ 1.72 million   │
8identity1.70 million   │
9└──────────┴────────────────┘
10
113 rows in set. Elapsed: 0.124 sec. Processed 617.97 million rows, 617.97 MB (5.00 billion rows/s., 5.00 GB/s.)
12Peak memory usage: 139.03 MiB.

在这些 commit 事件中,我们可以使用 JSON 类型路径语法检查事件类型

1SELECT
2	data.commit.collection AS collection,
3	count() AS c,
4	uniq(data.did) AS users
5FROM bluesky_raw
6WHERE kind = 'commit'
7GROUP BY ALL
8ORDER BY c DESC
9LIMIT 10
10FORMAT PrettyCompactMonoBlock
11
12┌─collection───────────────┬─────────c─┬───users─┐
13│ app.bsky.feed.like   	   │ 705468149710651614│ app.bsky.graph.follow	   │ 406406091862973015│ app.bsky.feed.post   	   │ 137946245432326516│ app.bsky.feed.repost 	   │  90847077281139817│ app.bsky.graph.block 	   │  25277808152362118│ app.bsky.graph.listitem  │   846400616600219│ app.bsky.actor.profile   │   8168943408355820│ app.bsky.graph.listblock │	64329221669521│ app.bsky.feed.threadgate │	5595049420222│ app.bsky.feed.postgate   │	2756753879023└──────────────────────────┴───────────┴─────────┘
24
2510 rows in set. Elapsed: 19.923 sec. Processed 1.38 billion rows, 122.00 GB (69.50 million rows/s., 6.12 GB/s.)
26Peak memory usage: 1003.91 MiB.

我们可以看到,正如人们可能预期的那样,大多数事件似乎是点赞和关注。

Silver 层用于清洁数据

Silver 层代表 Medallion 工作流程的下一阶段,将来自 Bronze 层的原始数据转换为更一致和结构良好的形式。此层解决了数据质量问题,执行更多过滤、标准化模式、执行转换并确保删除所有重复项。使用 ClickHouse,我们通常看到 Bronze 表直接映射到 Silver 等效项。

我们知道重复项将具有相同的 bluesky_ts(和其他列),只有不同的 scrape_ts 值,后者可能晚得多 - 尽管我们之前确定,大多数重复项发生在 20 分钟窗口内。为了确保没有重复项传递到我们的 Gold 层,我们在 Silver 层引入了有限重复窗口的概念。事件将被分配到这些重复窗口,这些窗口将根据其 bluesky_ts 值从当前时间偏移。这些“窗口”将定期刷新到我们的 Gold 层,并保证每个事件只有一个副本被传输。

这些重复窗口意味着我们不需要在无限期内对事件进行重复数据删除。这可以节省大量资源,并使问题更易于管理 - 正如我们所展示的,这可以在 ClickHouse 中高效实现。

Markdown Image

将事件分配到重复数据删除窗口(跟踪实时并定期刷新)依赖于及时交付数据而没有显着延迟。

我们可以通过查询我们的 Bronze 表来确定

知道我们的事件通常在其 bluesky_ts 的 20 分钟内交付,我们可以在我们的 Silver 层中可靠地创建重复数据删除窗口。为此,我们为每个 20 分钟间隔在 ClickHouse 中创建一个分区 - 分区有效地等同于窗口。事件根据它们落入的间隔分配给每个分区,使用 toStartOfInterval(bluesky_ts, toIntervalMinute(20)) 函数。我们生成的 Silver 表架构

1CREATE TABLE bluesky.bluesky_dedup
2(
3	`data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`),
4	`kind` LowCardinality(String),
5	`scrape_ts` DateTime64(6),
6	`bluesky_ts` DateTime64(6),
7	`dedup_hash` String
8)
9ENGINE = ReplacingMergeTree
10PARTITION BY toStartOfInterval(bluesky_ts, toIntervalMinute(20))
11ORDER BY dedup_hash
12TTL toStartOfMinute(bluesky_ts) + toIntervalMinute(1440) SETTINGS ttl_only_drop_parts=1

虽然我们使用 ReplacingMergeTree,但我们将拥有每个分区内的重复数据删除事件,即合并将仅在分区内发生。请注意,我们使用 TTL 来在数据超过 1440 秒(24 小时)后过期。ttl_only_drop_parts=1 设置确保仅当该部分中的所有行都已过期时才删除部件。

较多的分区数可能会导致大部件计数问题,从而导致查询性能问题和“部件过多”错误。我们通过仅在 Silver 表中保留一天的分区(总共 72 个)来缓解此问题,使用 TTL 规则来使较旧的数据过期。

用于过滤的增量物化视图

在将过滤和去重规则应用于 Bronze 数据时,用户通常会保留不匹配项,方法是将这些不匹配项发送到死信表以进行进一步分析。 考虑到我们计划定期将银层中的最新分区发送到金层,我们不希望事件到达太晚。 出于这个原因,并为了演示死信队列原则,我们将把 bronze 层中 scrape_tsbluesky_ts 之间的时间差大于 20 分钟的任何事件发送到死信队列。 “延迟”小于此值的事件将被插入到上面所示的银表中的相应分区中。

为了实现这一点,我们使用了两个增量物化视图。 每个视图都在插入到 bluesky_raw Bronze 表中的行上运行 SELECT 查询,并将结果发送到死信队列或 bluesky_dedup 银表。 除了目标表之外,这些视图之间的区别在于它们的过滤条件。

Markdown Image

我们用于将行发送到银表的视图

1CREATE MATERIALIZED VIEW bluesky.bluesky_dedup_mv TO bluesky.bluesky_dedup
2(
3	`data` JSON,
4	`kind` LowCardinality(String),
5	`scrape_ts` DateTime64(6),
6	`bluesky_ts` DateTime64(6),
7	`dedup_hash` String
8)
9AS SELECT
10	data,
11	kind,
12	scrape_ts,
13	bluesky_ts,
14	dedup_hash
15FROM bluesky.bluesky_raw
16WHERE abs(timeDiff(scrape_ts, bluesky_ts)) < 1200

我们的死信队列表模式及其相关的物化视图

1CREATE TABLE bluesky.bluesky_dlq
2(
3	`data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`),
4	`kind` LowCardinality(String),
5	`scrape_ts` DateTime64(6),
6	`bluesky_ts` DateTime64(6),
7	`dedup_hash` String
8)
9ENGINE = MergeTree
10ORDER BY (kind, scrape_ts)
11
12CREATE MATERIALIZED VIEW bluesky.bluesky_dlq_mv TO bluesky.bluesky_dlq
13(
14	`data` JSON,
15	`kind` LowCardinality(String),
16	`scrape_ts` DateTime64(6),
17	`bluesky_ts` DateTime64(6),
18	`dedup_hash` String
19)
20AS SELECT
21	data,
22	kind,
23	scrape_ts,
24	bluesky_ts,
25	dedup_hash
26FROM bluesky.bluesky_raw
27WHERE abs(timeDiff(scrape_ts, bluesky_ts)) >= 1200

请注意,我们为死信队列使用了标准的 MergeTree。

将数据发送到金层

上述过程会在我们的银层中填充分区。 我们希望定期从这些分区传输数据到我们的金层,以保证事件已完全去重。 我们希望这种情况能够及时发生,以确保最近的数据在我们的金层中可用于分析。

我们使用可刷新物化视图来实现这种定期刷新。 这些视图定期针对银层表执行,并支持高级转换,例如复杂的连接,这些连接在将数据写入金层表之前对其进行反规范化。

在我们的案例中,我们希望简单地定期将数据从最新的分区(不应再接收任何数据)插入到金表中。 此查询应使用 FINAL 子句执行,以确保所有事件都已去重。 虽然这通常比普通查询在计算上更昂贵,但我们可以利用以下两个属性

  • 查询仅定期执行 - 在我们的案例中,每 20 分钟执行一次,将成本从用户查询转移到摄取层。
  • 我们每次执行都以单个分区为目标。 我们可以使用设置 do_not_merge_across_partitions_select_final=1 将查询时间去重限制为目标分区,从而进一步优化此查询并减少所需的工作。

Markdown Image

这仅要求我们每次都识别要刷新到金层的分区。 我们的逻辑在此处由上图捕获,但总结如下

  1. 我们使用 _partition_id 元数据字段识别银表 bluesky_dedup 中的最新分区。 我们从这个 partition_id(这是一个时间戳)中减去 40 分钟,得到两个时间窗口之前的分区,即 X - 2。 我们将其称为 current_partition
  2. 我们的目标金层表 bluesky 包含一个 _rmt_partition_id 列,该列由可刷新物化视图填充,该视图记录每个事件的来源银分区。 我们使用它来识别最近成功传输的分区。 我们在此基础上增加 20 分钟以识别要处理的下一个分区,将其称为 next_to_process
  3. 如果 next_to_process 等于 1200,我们知道 bluesky 是空的(0 + 1200 秒 = 1200),并且尚未将任何事件插入到金层,即视图的首次执行。 在这种情况下,我们只需使用 current_partition 的值,并插入所有 _partition_id = current_partition 的事件。
  4. 如果 next_to_process 大于 1200,我们知道我们已经传输了分区。 在这种情况下,如果 current_partition >= next_to_process,那么我们知道我们至少落后最新分区 40 分钟(2 个分区),并使用 next_to_process 的值 - 插入所有 _partition_id = next_to_process 的事件。 如果 current_partition < next_to_process,则返回 noop (0) 并且不移动数据。

上述逻辑旨在对执行并非每 20 分钟完美执行的情况(例如重复执行或执行延迟或失败的情况)保持稳健。 我们最终的可刷新物化视图在其 SELECT 语句中封装了上述逻辑,如下所示

1CREATE MATERIALIZED VIEW bluesky.blue_sky_dedupe_rmv
2REFRESH EVERY 20 MINUTE APPEND TO bluesky.bluesky
3(
4	`data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`),
5	`kind` LowCardinality(String),
6	`bluesky_ts` DateTime64(6),
7	`_rmt_partition_id` LowCardinality(String)
8)
9AS WITH
10	(
11          --step 1
12    	  SELECT toUnixTimestamp(subtractMinutes(CAST(_partition_id, 'DateTime'), 40))
13    	  FROM bluesky.bluesky_dedup
14    	  GROUP BY _partition_id
15    	  ORDER BY _partition_id DESC
16    	  LIMIT 1
17	) AS current_partition,
18	(
19          --step 2
20    	  SELECT toUnixTimestamp(addMinutes(CAST(max(partition_id), 'DateTime'), 20))
21    	  FROM bluesky.latest_partition
22	) AS next_to_process
23SELECT
24	data,
25	kind,
26	bluesky_ts,
27	_partition_id AS _rmt_partition_id
28FROM bluesky.bluesky_dedup
29FINAL
30--step 3 & 4
31WHERE _partition_id = CAST(if(next_to_process = 1200, current_partition, if(current_partition >= next_to_process, next_to_process, 0)), 'String')
32SETTINGS do_not_merge_across_partitions_select_final = 1

此视图每 20 分钟执行一次,将干净、去重的数据交付到我们的金层。 请注意,数据会产生 40 分钟的延迟,直到在此处可用,尽管用户可以根据需要查询银层以获取更新的数据。

一位敏锐的读者可能会注意到,我们在步骤 2 和之前的图表中使用的表是 latest_partition,而不是查询 bluesky 金表中的 _rmt_partition_id。 此表由增量物化视图和优化产生,这使得识别下一个分区更加高效。 此视图(如下所示)跟踪从插入到金表的最新分区。

1CREATE MATERIALIZED VIEW bluesky.latest_partition_mv TO bluesky.latest_partition
2(
3	`partition_id` UInt32
4)
5AS SELECT max(CAST(_rmt_partition_id, 'UInt32')) AS partition_id
6FROM bluesky.bluesky
7
8CREATE TABLE bluesky.latest_partition
9(
10	`partition_id` SimpleAggregateFunction(max, UInt32)
11)
12ENGINE = AggregatingMergeTree
13ORDER BY tuple()

用于数据分析的金层

上述可刷新物化视图定期将数据发送到我们的金层表 bluesky。 此表的模式如下所示

1CREATE TABLE bluesky.bluesky
2(
3	`data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`),
4	`kind` LowCardinality(String),
5	`bluesky_ts` DateTime64(6),
6	`_rmt_partition_id` LowCardinality(String)
7)
8ENGINE = MergeTree
9PARTITION BY toStartOfInterval(bluesky_ts, toIntervalMonth(1))
10ORDER BY (kind, bluesky_ts)

由于数据在插入之前已完全去重,我们可以使用标准的 MergeTree。 我们的排序键在此处完全根据我们消费者的访问模式选择,并为了优化压缩。 我们的表在此处按月分区,主要用于数据管理,并且由于我们期望大多数查询都读取最新的数据。

请注意,虽然我们仍然在此层利用 JSON 类型 a,但我们可以在之前的可刷新物化视图中对数据执行更多转换,例如将常用的查询列提取到根目录,或使用 ALIAS 列,以简化查询语法。

用于常见查询的物化视图

此金层应完全针对下游应用程序和消费者的查询进行优化。 虽然我们的排序键旨在促进这一点,但并非所有访问模式都相同。 到目前为止,增量物化视图最常见的应用是在层之间执行过滤和数据插入。 然而,我们之前使用视图来计算下一个分区的示例,暗示了我们还可以如何优化其他查询。

Markdown Image

除了允许过滤并将数据子集发送到具有不同排序键(针对其他访问模式进行优化)的目标表之外,物化视图还可用于在将行添加到金表时预先计算插入时的聚合。 这些聚合结果将是原始数据的较小表示形式(在聚合的情况下为部分草图)。 除了确保从目标表读取结果的最终查询很简单之外,它还确保查询时间比对原始数据执行相同计算更快,从而将计算(以及查询延迟)从查询时转移到插入时。 有关物化视图的完整指南,请访问此处

例如,考虑我们之前的查询,该查询计算最常见的提交事件类型

1SELECT data.commit.collection AS collection, count() AS c, uniq(data.did) AS users
2FROM bluesky
3WHERE kind = 'commit'
4GROUP BY ALL
5ORDER BY c DESC
6LIMIT 10
7
8┌─collection───────────────┬─────────c─┬───users─┐
9│ app.bsky.feed.like   	   │ 269979403527060410│ app.bsky.graph.follow	   │ 150891706563198711│ app.bsky.feed.post   	   │  46886207308364712│ app.bsky.feed.repost 	   │  33249341195698613│ app.bsky.graph.block 	   │   978970799357814│ app.bsky.graph.listitem  │   323167610202015│ app.bsky.actor.profile   │   1731669128089516│ app.bsky.graph.listblock │	26366710531017│ app.bsky.feed.threadgate │	2157154987118│ app.bsky.feed.postgate   │ 	 996251996019└──────────────────────────┴───────────┴─────────┘
20
2110 rows in set. Elapsed: 6.445 sec. Processed 516.53 million rows, 45.50 GB (80.15 million rows/s., 7.06 GB/s.)
22Peak memory usage: 986.51 MiB.

对于 5 亿个事件,此查询大约需要 6 秒钟。 要将其转换为增量物化视图,我们需要准备一个表来接收增量聚合结果

1CREATE TABLE bluesky.top_post_types
2(
3	`collection` LowCardinality(String),
4	`posts` SimpleAggregateFunction(sum, UInt64),
5	`users` AggregateFunction(uniq, String)
6)
7ENGINE = AggregatingMergeTree
8ORDER BY collection

请注意,我们需要如何使用 AggregatingMergeTree 并指定排序键为我们的分组键 - 此列中具有相同值的聚合结果将被合并。 增量结果需要存储在特殊的列类型 SimpleAggregateFunctionAggregateFunction 下 - 为此,我们需要指定函数和关联的类型。

我们相应的物化视图(在将行插入到金表时填充此表)如下所示。 请注意,我们如何使用 -State 后缀来显式生成聚合状态

1CREATE MATERIALIZED VIEW top_post_types_mv TO top_posts_types
2AS
3SELECT data.commit.collection AS collection, count() AS posts,
4	uniqState(CAST(data.did, 'String')) AS users
5FROM bluesky
6WHERE kind = 'commit'
7GROUP BY ALL

在查询此表时,我们使用 -Merge 后缀来合并聚合状态。

1SELECT collection,
2       sum(posts) AS posts,
3       uniqMerge(users) AS users
4FROM top_post_types
5GROUP BY collection
6ORDER BY posts DESC
7LIMIT 10
8
910 rows in set. Elapsed: 0.042 sec.

我们的查询性能提高了 150 倍以上!

我们的最终架构图显示了我们所有的层

Markdown Image

示例查询和可视化 @ sql.clickhouse.com

以上表示一个非常简单的示例。 此数据在 sql.clickhouse.com 中可用,其中上述 Medallion 工作流程持续执行。 我们提供了更多物化视图作为高效查询的示例。

例如,要查询人们在 Bluesky 上点赞、发布和转发的最受欢迎时间,用户可以运行以下查询

1SELECT event, hour_of_day, sum(count) as count
2FROM bluesky.events_per_hour_of_day
3WHERE event in ['post', 'repost', 'like']
4GROUP BY event, hour_of_day
5ORDER BY hour_of_day;
6
772 rows in set. Elapsed: 0.007 sec.

查询运行时间为 7 毫秒。

您可以在我们的 Playground 中运行此查询,将结果渲染为图表

Markdown Image

以下是相应的物化视图及其目标表,该表在将行插入到金表时填充

1CREATE TABLE bluesky.events_per_hour_of_day
2(
3    event LowCardinality(String),
4    hour_of_day UInt8,
5    count SimpleAggregateFunction(sum, UInt64)
6)
7ENGINE = AggregatingMergeTree
8ORDER BY (event, hour_of_day);
9
10
11CREATE MATERIALIZED VIEW bluesky.events_per_hour_of_day_mv TO bluesky.events_per_hour_of_day
12AS SELECT
13    extract(data.commit.collection, '\\.([^.]+)$') AS event,
14    toHour(bluesky_ts) as hour_of_day,
15    count() AS count
16FROM bluesky.bluesky
17WHERE (kind = 'commit')
18GROUP BY event, hour_of_day;

有关查询及其相关视图的完整列表,请参阅此处。 或者,随时直接查询金表或银表! 一些入门示例

结束语

在本博客中,我们展示了一个完全实现的 Medallion 架构,该架构完全使用 ClickHouse 构建,演示了其强大的功能如何将原始的半结构化数据转换为高质量、可用于查询的数据集。 通过 Bronze、Silver 和 Gold 层,我们解决了常见的挑战,例如格式错误的数据、结构不一致以及大量的重复数据。 通过利用 ClickHouse 的 JSON 数据类型,我们高效地处理了固有的半结构化和高度动态的数据,同时保持了卓越的性能。

虽然此架构提供了稳健而灵活的工作流程,但它确实引入了固有的延迟,因为数据会在各层之间移动。 在我们的实施中,去重窗口有助于最大限度地减少这些延迟,但在交付实时数据和确保高质量数据之间仍然存在权衡。 这使得 Medallion 架构特别适合具有高重复率且对实时可用性要求不太严格的数据集。

我们鼓励用户在我们的公共演示环境 sql.clickhouse.com 上进一步探索它。 在这里,您可以免费查询数据并亲身体验工作流程。

立即开始使用 ClickHouse Cloud,并获得 300 美元的信用额度。 在 30 天试用期结束时,继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。 访问我们的定价页面了解详情。

分享此帖子

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云服务!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。