DoubleCloud 即将停止运营。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

ClickHouse 中的物化视图链

author avatar
Mark Needham
2024年4月16日

ClickHouse 中的物化视图是在源表中到达一批行时触发的查询。它们将对这些行进行操作,可能在写入目标表之前转换数据。下图概述了其工作原理

mv-chain-1.png

在过去几周里,我一直在学习有关聚合状态的信息。我创建了一个小型演示,其中两个物化视图读取同一个 Kafka 表引擎。一个存储原始事件数据,另一个存储聚合状态。

当我向Tom展示此示例时,他建议与其让两个物化视图都从 Kafka 引擎表读取,不如将物化视图链接在一起。下图显示了他想到的内容

mv-chain-2.png

换句话说,与其让聚合状态物化视图从 Kafka 引擎表读取,不如让它从已经从 Kafka 中提取的原始事件读取。

在本博文的其余部分,我们将逐步介绍如何链接物化视图的实际示例。我们将使用维基最近更改提要,该提要提供了一系列表示对各种维基媒体属性所做更改的事件。数据以服务器端事件的形式提供,示例消息的data属性如下所示

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
    "request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
    "id": "41c73232-5922-4484-82f3-34d45f22ee7a",
    "dt": "2024-03-26T09:13:09Z",
    "domain": "en.wiktionary.org",
    "stream": "mediawiki.recentchange",
    "topic": "eqiad.mediawiki.recentchange",
    "partition": 0,
    "offset": 4974797626
  },
  "id": 117636935,
  "type": "edit",
  "namespace": 0,
  "title": "MP3播放器",
  "title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
  "comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
  "timestamp": 1711444389,
  "user": "WingerBot",
  "bot": true,
  "notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
  "minor": true,
  "patrolled": true,
  "length": {
    "old": 229,
    "new": 234
  },
  "revision": {
    "old": 50133194,
    "new": 78597416
  },
  "server_url": "https://en.wiktionary.org",
  "server_name": "en.wiktionary.org",
  "server_script_path": "/w",
  "wiki": "enwiktionary",
  "parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -&gt; {{head|zh|noun}}, {{zh-hanzi}} -&gt; {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}

假设我们正在构建一个仪表板来跟踪所做的更改。我们对单个更改不感兴趣,而是希望按分钟跟踪进行更改的唯一用户数量、更改的唯一页面数量以及所做更改的总数。

我们将首先创建并使用wiki数据库

CREATE DATABASE wiki;
USE wiki;

创建 Kafka 表引擎

接下来,让我们创建一个名为wikiQueue的表,该表将从 Kafka 中使用消息。代理在本地端口 9092 上运行,我们的主题称为wiki_events

请注意,如果您使用的是 ClickHouse Cloud,则需要使用ClickPipes来处理来自 Kafka 的数据摄取。

CREATE TABLE wikiQueue(
    id UInt32,
    type String,
    title String,
    title_url String,
    comment String,
    timestamp UInt64,
    user String,
    bot Boolean,
    server_url String,
    server_name String,
    wiki String,
    meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
  'localhost:9092', 
  'wiki_events', 
  'consumer-group-wiki', 
  'JSONEachRow'
);

rawEvents表存储dateTimetitle_urltopicuser

CREATE TABLE rawEvents (
    dateTime DateTime64(3, 'UTC'),
    title_url String,
    topic String,
    user String
) 
ENGINE = MergeTree 
ORDER BY dateTime;

然后,我们将编写以下物化视图以将数据写入rawEvents

CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS 
SELECT toDateTime(timestamp) AS dateTime,
       title_url, 
       tupleElement(meta, 'topic') AS topic, 
       user
FROM wikiQueue
WHERE title_url <> '';

我们使用toDateTime函数将纪元秒时间戳转换为 DateTime 对象。我们还使用tupleElement函数从meta对象中提取topic属性。

存储聚合状态

接下来,让我们创建一个存储聚合状态的表,以启用增量聚合。聚合状态存储在一个具有AggregateFunction(<aggregationType>, <dataType>)类型的列中。

为了保留String值的唯一计数(我们需要这样做才能跟踪唯一用户和唯一页面),我们将使用AggregateFunction(uniq, String)类型。为了保留运行总数(我们需要为更新总数执行此操作),我们将使用AggregateFunction(sum, UInt32)类型。UInt32类型为我们提供了4294967295的最大值,这远大于我们在 1 分钟内收到的更新数量。

我们将此表称为byMinute,其定义如下

CREATE TABLE byMinute
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

填充此表的物化视图将从rawEvents读取并使用-State组合器提取中间状态。我们将对用户和页面使用uniqState函数,对更新使用sumState函数。

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS 
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqState(user) as users,
       uniqState(title_url) as pages,
       sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;

下图显示了我们迄今为止创建的物化视图和表的链

mv-chain-3.png

我们没有任何数据流入 Kafka,因此此表将没有任何数据。让我们通过运行以下命令来解决此问题。

curl -N https://stream.wikimedia.org/v2/stream/recentchange  |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø

此命令从最近更改提要中提取data属性,使用 jq 构造一个key:value对,然后使用 kcat 将其管道传输到 Kafka。

如果我们将其运行一段时间,然后我们可以编写一个查询来查看正在进行多少更改:

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
    ┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1.2024-03-26 15:53:00.00024875510022.2024-03-26 15:52:00.000429148121643.2024-03-26 15:51:00.000406141721594.2024-03-26 15:50:00.000392124018435.2024-03-26 15:49:00.000418134619106.2024-03-26 15:48:00.000422138818677.2024-03-26 15:47:00.000423144920158.2024-03-26 15:46:00.000409142019339.2024-03-26 15:45:00.0004021348182410.2024-03-26 15:44:00.00043216422142 │
    └─────────────────────────┴───────┴───────┴─────────┘

看起来一切正常。

向链中添加另一个 MV

现在,在运行一段时间后,我们决定将数据分组并在 10 分钟的存储桶中分块,而不仅仅是 1 分钟的存储桶。我们可以通过对byMinute表编写以下查询来实现此目的

SELECT
    toStartOfTenMinutes(dateTime) AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

这将返回类似以下内容,其中dateTime列中的值现在以 10 分钟的增量显示。

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

这在处理我们正在使用的小数据量时效果很好,但是当我们处理更大的数据时,我们可能希望拥有另一个表来存储以 10 分钟间隔分块的数据。让我们创建该表

CREATE TABLE byTenMinutes
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

接下来,让我们创建一个物化视图来填充该表。物化视图将使用类似于我们用于计算上述 10 分钟存储桶的查询来查询byMinute表。唯一的变化是,我们不需要使用-Merge组合器,而是需要使用-MergeState组合器来返回聚合byMinute数据的聚合状态,而不是基础结果。

理论上,我们将节省一些计算时间,因为byMinute MV 已经将数据聚合到 1 分钟的存储桶中。现在,我们利用 1 分钟的存储桶,而不是从头开始将原始的按秒数据聚合到 10 分钟的存储桶中。

物化视图如下所示

CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqMergeState(users) as users,
       uniqMergeState(pages) as pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

下图显示了我们现在创建的物化视图链

mv-chain-4.png

如果我们查询byTenMinutes表,它将没有任何数据,并且一旦它开始填充,它只会获取摄取到byMinute表中的新数据。但并非一切都没有希望,我们仍然可以编写查询来回填旧数据

INSERT INTO byTenMinutes 
SELECT toStartOfTenMinutes(dateTime),
       uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

然后,我们可以对byTenMinutes编写以下查询以返回按 10 分钟存储桶分组的数据

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

我们将获得与查询byMinute表时相同的结果

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘
分享此文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品信息!
正在加载表单...

关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image