物化视图 在 ClickHouse 中是指当一批行到达源表时触发的查询。它们将对这些行进行操作,可能会在写入目标表之前转换数据。下图概述了其工作原理
在过去的几周里,我一直在学习 聚合状态。我创建了一个小型演示,其中包含两个从同一个 Kafka 表引擎读取数据的物化视图。一个存储原始事件数据,另一个存储聚合状态。
当我向 Tom 展示这个例子时,他建议我不要让两个物化视图都从 Kafka 引擎表读取数据,而是可以将物化视图链接在一起。下图显示了他的想法
换句话说,聚合状态物化视图不应该从 Kafka 引擎表读取数据,而应该从已经从 Kafka 中提取的原始事件中读取数据。
在本博文的剩余部分,我们将通过一个实际示例来说明如何链接物化视图。我们将使用 Wiki 最近更改 feed,它提供了一系列事件,代表对各种 Wikimedia 属性所做的更改。数据以服务器发送事件的形式提供,下面显示了一个示例消息的 data
属性
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
"request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
"id": "41c73232-5922-4484-82f3-34d45f22ee7a",
"dt": "2024-03-26T09:13:09Z",
"domain": "en.wiktionary.org",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"partition": 0,
"offset": 4974797626
},
"id": 117636935,
"type": "edit",
"namespace": 0,
"title": "MP3播放器",
"title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
"comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
"timestamp": 1711444389,
"user": "WingerBot",
"bot": true,
"notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
"minor": true,
"patrolled": true,
"length": {
"old": 229,
"new": 234
},
"revision": {
"old": 50133194,
"new": 78597416
},
"server_url": "https://en.wiktionary.org",
"server_name": "en.wiktionary.org",
"server_script_path": "/w",
"wiki": "enwiktionary",
"parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}
假设我们正在构建一个仪表板来跟踪正在进行的更改。我们对单个更改不感兴趣,而是希望按分钟跟踪进行更改的唯一用户数、正在更改的唯一页面数以及所做的更改总数。
我们将首先创建并使用 wiki
数据库
CREATE DATABASE wiki;
USE wiki;
创建 Kafka 表引擎
接下来,让我们创建一个名为 wikiQueue
的表,它将从 Kafka 消费消息。broker 在本地端口 9092 上运行,我们的主题名为 wiki_events
。
请注意,如果您使用的是 ClickHouse Cloud,则需要使用 ClickPipes 来处理从 Kafka 摄取数据。
CREATE TABLE wikiQueue(
id UInt32,
type String,
title String,
title_url String,
comment String,
timestamp UInt64,
user String,
bot Boolean,
server_url String,
server_name String,
wiki String,
meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
'localhost:9092',
'wiki_events',
'consumer-group-wiki',
'JSONEachRow'
);
rawEvents
表存储 dateTime
、title_url
、topic
和 user
。
CREATE TABLE rawEvents (
dateTime DateTime64(3, 'UTC'),
title_url String,
topic String,
user String
)
ENGINE = MergeTree
ORDER BY dateTime;
然后,我们将编写以下物化视图以将数据写入 rawEvents
CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS
SELECT toDateTime(timestamp) AS dateTime,
title_url,
tupleElement(meta, 'topic') AS topic,
user
FROM wikiQueue
WHERE title_url <> '';
我们使用 toDateTime
函数将 epoch 秒时间戳转换为 DateTime 对象。我们还使用 tupleElement
函数从 meta
对象中提取 topic
属性。
存储聚合状态
接下来,让我们创建一个表来存储聚合状态,以启用 增量聚合。聚合状态存储在具有 AggregateFunction(<aggregationType>, <dataType>)
类型的列中。
为了保持 String
值的唯一计数(我们需要这样做来跟踪唯一用户和唯一页面),我们将使用 AggregateFunction(uniq, String)
类型。为了保持运行总数(我们需要用于总更新),我们将使用 AggregateFunction(sum, UInt32
) 类型。UInt32
类型为我们提供了 4294967295
的最大值,这远远超过了我们在一分钟内将收到的更新次数。
我们将此表称为 byMinute
,其定义如下
CREATE TABLE byMinute
(
dateTime DateTime64(3, 'UTC') NOT NULL,
users AggregateFunction(uniq, String),
pages AggregateFunction(uniq, String),
updates AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree()
ORDER BY dateTime;
填充此表的物化视图将从 rawEvents
读取数据,并使用 -State
组合器来提取中间状态。我们将对用户和页面使用 uniqState
函数,对更新使用 sumState
函数。
CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS
SELECT toStartOfMinute(dateTime) AS dateTime,
uniqState(user) as users,
uniqState(title_url) as pages,
sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;
下图显示了到目前为止我们创建的物化视图和表的链
我们没有任何数据流入 Kafka,因此此表将没有任何数据。让我们通过运行以下命令来解决这个问题。
curl -N https://stream.wikimedia.org/v2/stream/recentchange |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø
此命令从最近更改 feed 中提取 data
属性,使用 jq 构建 key:value
对,然后使用 kcat 将其管道传输到 Kafka。
如果我们让它运行一会儿,我们就可以编写一个查询来查看正在进行的更改次数:
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:53:00.000 │ 248 │ 755 │ 1002 │
2. │ 2024-03-26 15:52:00.000 │ 429 │ 1481 │ 2164 │
3. │ 2024-03-26 15:51:00.000 │ 406 │ 1417 │ 2159 │
4. │ 2024-03-26 15:50:00.000 │ 392 │ 1240 │ 1843 │
5. │ 2024-03-26 15:49:00.000 │ 418 │ 1346 │ 1910 │
6. │ 2024-03-26 15:48:00.000 │ 422 │ 1388 │ 1867 │
7. │ 2024-03-26 15:47:00.000 │ 423 │ 1449 │ 2015 │
8. │ 2024-03-26 15:46:00.000 │ 409 │ 1420 │ 1933 │
9. │ 2024-03-26 15:45:00.000 │ 402 │ 1348 │ 1824 │
10. │ 2024-03-26 15:44:00.000 │ 432 │ 1642 │ 2142 │
└─────────────────────────┴───────┴───────┴─────────┘
一切看起来都运行良好。
向链中添加另一个 MV
现在,在运行一段时间后,我们决定将数据按 10 分钟的时间段分组和分块,而不是仅仅按 1 分钟的时间段分组,这将很有用。我们可以通过对 byMinute 表编写以下查询来做到这一点
SELECT
toStartOfTenMinutes(dateTime) AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
这将返回如下内容,其中 dateTime
列中的值现在以 10 分钟为增量。
┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:50:00 │ 977 │ 4432 │ 7168 │
2. │ 2024-03-26 15:40:00 │ 1970 │ 12372 │ 20555 │
3. │ 2024-03-26 15:30:00 │ 1998 │ 11673 │ 20043 │
4. │ 2024-03-26 15:20:00 │ 1981 │ 12051 │ 20026 │
5. │ 2024-03-26 15:10:00 │ 1996 │ 11793 │ 19392 │
6. │ 2024-03-26 15:00:00 │ 2092 │ 12778 │ 20649 │
7. │ 2024-03-26 14:50:00 │ 2062 │ 12893 │ 20465 │
8. │ 2024-03-26 14:40:00 │ 2028 │ 12798 │ 20873 │
9. │ 2024-03-26 14:30:00 │ 2020 │ 12169 │ 20364 │
10. │ 2024-03-26 14:20:00 │ 2077 │ 11929 │ 19797 │
└─────────────────────┴───────┴───────┴─────────┘
这对于我们正在处理的小数据量来说效果很好,但是当我们处理更大的数据时,我们可能希望有另一个表来存储按 10 分钟间隔分桶的数据。让我们创建该表
CREATE TABLE byTenMinutes
(
dateTime DateTime64(3, 'UTC') NOT NULL,
users AggregateFunction(uniq, String),
pages AggregateFunction(uniq, String),
updates AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree()
ORDER BY dateTime;
接下来,让我们创建一个物化视图来填充该表。物化视图将使用类似于我们上面用于计算 10 分钟分桶的查询来查询 byMinute
表。唯一的更改是,我们将需要使用 -MergeState
组合器来返回聚合 byMinute
数据而不是底层结果的聚合状态,而不是使用 -Merge
组合器。
从理论上讲,我们将节省一些计算时间,因为 byMinute
MV 已经按一分钟的时间段聚合了数据。现在,我们不再需要从头开始将原始的按秒数据聚合到 10 分钟的时间段中,而是利用一分钟的时间段。
物化视图如下所示
CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
uniqMergeState(users) as users,
uniqMergeState(pages) as pages,
sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;
下图显示了我们现在创建的物化视图的链接
如果我们查询 byTenMinutes
表,它将没有任何数据,一旦它开始填充数据,它将仅拾取摄取到 byMinute
表中的新数据。但是一切还没有丢失,我们仍然可以编写一个查询来回填旧数据
INSERT INTO byTenMinutes
SELECT toStartOfTenMinutes(dateTime),
uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;
然后,我们可以对 byTenMinutes
编写以下查询,以返回按 10 分钟时间段分组的数据
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
我们将获得与查询 byMinute
表时相同的结果
┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:50:00 │ 977 │ 4432 │ 7168 │
2. │ 2024-03-26 15:40:00 │ 1970 │ 12372 │ 20555 │
3. │ 2024-03-26 15:30:00 │ 1998 │ 11673 │ 20043 │
4. │ 2024-03-26 15:20:00 │ 1981 │ 12051 │ 20026 │
5. │ 2024-03-26 15:10:00 │ 1996 │ 11793 │ 19392 │
6. │ 2024-03-26 15:00:00 │ 2092 │ 12778 │ 20649 │
7. │ 2024-03-26 14:50:00 │ 2062 │ 12893 │ 20465 │
8. │ 2024-03-26 14:40:00 │ 2028 │ 12798 │ 20873 │
9. │ 2024-03-26 14:30:00 │ 2020 │ 12169 │ 20364 │
10. │ 2024-03-26 14:20:00 │ 2077 │ 11929 │ 19797 │
└─────────────────────┴───────┴───────┴─────────┘