跳至主要内容
跳至主要内容

数据回填

无论您是 ClickHouse 的新手,还是负责现有的部署,您都不可避免地需要用历史数据回填表。在某些情况下,这相对简单,但当需要填充物化视图时,可能会变得更加复杂。本指南记录了您可以应用于您的用例的一些流程。

注意

本指南假定用户已经熟悉 增量物化视图使用表函数(如 s3 和 gcs)进行数据加载 的概念。我们还建议用户阅读我们的关于 优化从对象存储插入性能 的指南,其中提供的建议可以应用于本指南中的所有插入操作。

示例数据集

在本指南中,我们使用 PyPI 数据集。此数据集的每一行代表使用诸如 pip 之类的工具下载的 Python 包。

例如,该子集涵盖了单日 - 2024-12-17,并且可以在 https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/ 上公开获取。您可以使用以下查询

SELECT count()
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')

┌────count()─┐
│ 2039988137 │ -- 2.04 billion
└────────────┘

1 row in set. Elapsed: 32.726 sec. Processed 2.04 billion rows, 170.05 KB (62.34 million rows/s., 5.20 KB/s.)
Peak memory usage: 239.50 MiB.

此存储桶中的完整数据集包含超过 320 GB 的 parquet 文件。在下面的示例中,我们有意使用 glob 模式来定位子集。

我们假设用户正在消费来自 Kafka 或对象存储等数据流,用于此日期之后的数据。此数据的模式如下所示

DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')
FORMAT PrettyCompactNoEscapesMonoBlock
SETTINGS describe_compact_output = 1

┌─name───────────────┬─type────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ timestamp │ Nullable(DateTime64(6))                                                                                                                 │
│ country_code       │ Nullable(String)                                                                                                                        │
│ url │ Nullable(String)                                                                                                                        │
│ project            │ Nullable(String)                                                                                                                        │
│ file │ Tuple(filename Nullable(String), project Nullable(String), version Nullable(String), type Nullable(String))                             │
│ installer          │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ python             │ Nullable(String)                                                                                                                        │
│ implementation     │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ distro             │ Tuple(name Nullable(String), version Nullable(String), id Nullable(String), libc Tuple(lib Nullable(String), version Nullable(String))) │
│ system │ Tuple(name Nullable(String), release Nullable(String))                                                                                  │
│ cpu                │ Nullable(String)                                                                                                                        │
│ openssl_version    │ Nullable(String)                                                                                                                        │
│ setuptools_version │ Nullable(String)                                                                                                                        │
│ rustc_version      │ Nullable(String)                                                                                                                        │
│ tls_protocol       │ Nullable(String)                                                                                                                        │
│ tls_cipher         │ Nullable(String)                                                                                                                        │
└────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
注意

包含超过 1 万亿行的完整 PyPI 数据集可在我们的公共演示环境中 clickpy.clickhouse.com 上获取。有关此数据集的更多详细信息,包括演示如何利用物化视图提高性能以及如何每日填充数据,请参阅 此处

回填场景

通常需要在从某个时间点消费数据流时进行回填。这些数据正在插入到具有 增量物化视图 的 ClickHouse 表中,在插入块时触发这些视图。这些视图可能在插入之前转换数据,或者计算聚合结果并将其发送到目标表,以供下游应用程序稍后使用。

我们将尝试涵盖以下场景

  1. 在现有数据摄取中回填数据 - 正在加载新数据,并且需要回填历史数据。这些历史数据已被识别。
  2. 将物化视图添加到现有表 - 需要将新的物化视图添加到已经填充了历史数据并且数据正在流式传输的设置中。

我们假设数据将从对象存储回填。在所有情况下,我们的目标是避免数据插入暂停。

我们建议从对象存储回填历史数据。尽可能将数据导出到 Parquet 格式,以获得最佳的读取性能和压缩效果(减少网络传输)。通常首选大约 150MB 的文件大小,但 ClickHouse 支持超过 70 种文件格式,并且能够处理各种大小的文件。

使用重复的表和视图

对于所有场景,我们都依赖于“重复的表和视图”的概念。这些表和视图代表用于实时流式数据表的副本,并允许在隔离的环境中执行回填,并提供了一种简单的恢复方式,以防发生故障。例如,我们有以下主 pypi 表和物化视图,它计算每个 Python 项目的下载次数

CREATE TABLE pypi
(
    `timestamp` DateTime,
    `country_code` LowCardinality(String),
    `project` String,
    `type` LowCardinality(String),
    `installer` LowCardinality(String),
    `python_minor` LowCardinality(String),
    `system` LowCardinality(String),
    `on` String
)
ENGINE = MergeTree
ORDER BY (project, timestamp)

CREATE TABLE pypi_downloads
(
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY project

CREATE MATERIALIZED VIEW pypi_downloads_mv TO pypi_downloads
AS SELECT
 project,
    count() AS count
FROM pypi
GROUP BY project

我们使用数据集的子集填充主表和关联的视图

INSERT INTO pypi SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{000..100}.parquet')

0 rows in set. Elapsed: 15.702 sec. Processed 41.23 million rows, 3.94 GB (2.63 million rows/s., 251.01 MB/s.)
Peak memory usage: 977.49 MiB.

SELECT count() FROM pypi

┌──count()─┐
│ 20612750 │ -- 20.61 million
└──────────┘

1 row in set. Elapsed: 0.004 sec.

SELECT sum(count)
FROM pypi_downloads

┌─sum(count)─┐
│   20612750 │ -- 20.61 million
└────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 96.15 thousand rows, 769.23 KB (16.53 million rows/s., 132.26 MB/s.)
Peak memory usage: 682.38 KiB.

假设我们希望加载另一个子集 {101..200}。虽然我们可以直接插入到 pypi 中,但我们可以通过创建重复的表来隔离地执行此回填。

如果回填失败,我们不会影响我们的主表,并且可以简单地 截断 我们的重复表并重复操作。

要创建这些视图的新副本,我们可以使用带有后缀 _v2CREATE TABLE AS 子句

CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT
 project,
    count() AS count
FROM pypi_v2
GROUP BY project

我们使用大约相同大小的第二个子集填充它,并确认加载成功。

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')

0 rows in set. Elapsed: 17.545 sec. Processed 40.80 million rows, 3.90 GB (2.33 million rows/s., 222.29 MB/s.)
Peak memory usage: 991.50 MiB.

SELECT count()
FROM pypi_v2

┌──count()─┐
│ 20400020 │ -- 20.40 million
└──────────┘

1 row in set. Elapsed: 0.004 sec.

SELECT sum(count)
FROM pypi_downloads_v2

┌─sum(count)─┐
│   20400020 │ -- 20.40 million
└────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 95.49 thousand rows, 763.90 KB (14.81 million rows/s., 118.45 MB/s.)
Peak memory usage: 688.77 KiB.

如果在第二次加载的任何时候发生故障,我们可以简单地 截断 我们的 pypi_v2pypi_downloads_v2 并重复数据加载。

数据加载完成后,我们可以使用 ALTER TABLE MOVE PARTITION 子句将数据从我们的重复表移动到主表。

ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi

0 rows in set. Elapsed: 1.401 sec.

ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads

0 rows in set. Elapsed: 0.389 sec.
分区名称

上述 MOVE PARTITION 调用使用分区名称 ()。这代表此表的单个分区(未分区)。对于已分区的表,您需要调用多个 MOVE PARTITION 调用 - 每个分区一个。当前分区的名称可以从 system.parts 表中建立,例如 SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2')

现在我们可以确认 pypipypi_downloads 包含完整的数据。可以安全地删除 pypi_downloads_v2pypi_v2

SELECT count()
FROM pypi

┌──count()─┐
│ 41012770 │ -- 41.01 million
└──────────┘

1 row in set. Elapsed: 0.003 sec.

SELECT sum(count)
FROM pypi_downloads

┌─sum(count)─┐
│   41012770 │ -- 41.01 million
└────────────┘

1 row in set. Elapsed: 0.007 sec. Processed 191.64 thousand rows, 1.53 MB (27.34 million rows/s., 218.74 MB/s.)

SELECT count()
FROM pypi_v2

重要的是,MOVE PARTITION 操作既轻量级(利用硬链接),又是原子的,即它要么失败,要么成功,没有中间状态。

我们在下面的回填场景中大量利用此过程。

请注意,此过程要求用户选择每个插入操作的大小。

较大的插入,即更多的行,意味着需要的 MOVE PARTITION 操作更少。但是,这必须与插入失败(例如,由于网络中断)时的成本相平衡。您可以使用批处理文件来降低风险。这可以使用范围查询(例如 WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00)或 glob 模式来执行。例如,

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{201..300}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{301..400}.parquet')
--continued to all files loaded OR MOVE PARTITION call is performed
注意

ClickPipes 在从对象存储加载数据时使用这种方法,自动创建目标表及其物化视图的副本,并避免用户执行上述步骤。通过还使用多个工作线程,每个线程处理不同的子集(通过 glob 模式)并拥有自己的重复表,可以使用精确一次语义快速加载数据。对于感兴趣的人,更多详细信息可以在 此博客 中找到。

场景 1:在现有数据摄取中回填数据

在此场景中,我们假设要回填的数据不在隔离的存储桶中,因此需要进行过滤。数据正在插入,并且可以识别需要回填历史数据的某个时间戳或单调递增的列。

此过程遵循以下步骤

  1. 识别检查点 - 要恢复历史数据的某个时间戳或列值。
  2. 创建主表和物化视图目标表的重复项。
  3. 创建指向步骤 (2) 中创建的目标表的物化视图的副本。
  4. 插入到步骤 (2) 中创建的重复主表中。
  5. 将所有分区从重复表移动到其原始版本。删除重复表。

例如,在我们的 PyPI 数据中,假设我们加载了数据。我们可以识别最小时间戳,从而确定我们的“检查点”。

SELECT min(timestamp)
FROM pypi

┌──────min(timestamp)─┐
│ 2024-12-17 09:00:00 │
└─────────────────────┘

1 row in set. Elapsed: 0.163 sec. Processed 1.34 billion rows, 5.37 GB (8.24 billion rows/s., 32.96 GB/s.)
Peak memory usage: 227.84 MiB.

从以上信息可知,我们需要加载 2024-12-17 09:00:00 之前的数据。使用我们之前的方法,我们创建重复表和视图,并使用时间戳上的过滤器加载子集。

CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT project, count() AS count
FROM pypi_v2
GROUP BY project

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-*.parquet')
WHERE timestamp < '2024-12-17 09:00:00'

0 rows in set. Elapsed: 500.152 sec. Processed 2.74 billion rows, 364.40 GB (5.47 million rows/s., 728.59 MB/s.)
注意

在 Parquet 中过滤时间戳列可以非常有效。ClickHouse 将仅读取时间戳列以识别要加载的完整数据范围,从而最大限度地减少网络流量。Parquet 索引,例如最小值-最大值,也可以被 ClickHouse 查询引擎利用。

完成此插入后,我们可以移动关联的分区。

ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi

ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads

如果历史数据是隔离的存储桶,则不需要上述时间过滤器。如果不可用时间或单调列,请隔离您的历史数据。

只需在 ClickHouse Cloud 中使用 ClickPipes

如果您使用的是 ClickHouse Cloud,则应使用 ClickPipes 恢复历史备份,如果数据可以隔离在自己的存储桶中(不需要过滤器)。除了通过并行化加载和多个工作线程来减少加载时间外,ClickPipes 还可以自动执行上述过程,并为主表和物化视图创建重复表。

场景 2:将物化视图添加到现有表

在已经填充了大量数据并且数据正在插入的设置中,需要添加新的物化视图的情况并不少见。在这里,可以使用时间戳或单调递增的列来识别流中的某个点,这可以避免数据摄取暂停。在下面的示例中,我们假设两种情况,并倾向于避免暂停摄取的方法。

避免 POPULATE

我们不建议使用 POPULATE 命令来回填物化视图,除非是针对小型数据集且摄取已暂停的情况。此操作符可能会遗漏插入到其源表中的行,物化视图在填充哈希完成后创建。此外,此填充操作会针对所有数据运行,并且在大型数据集上容易受到中断或内存限制的影响。

时间戳或单调递增列可用

在这种情况下,我们建议新的物化视图包含一个过滤器,以限制行只包含那些大于未来任意数据的行。然后,可以使用主表中的历史数据从该日期开始回填物化视图。回填方法取决于数据大小和相关查询的复杂性。

我们最简单的方案包括以下步骤

  1. 创建我们的物化视图,并使用一个过滤器,该过滤器仅考虑大于近期未来某个时间的行。
  2. 运行一个 INSERT INTO SELECT 查询,该查询将插入到我们的物化视图的目标表中,从源表读取视图的聚合查询。

可以进一步增强此方案,以针对步骤 (2) 中的数据子集,和/或使用物化视图的重复目标表(在插入完成后将分区附加到原始表),以便在发生故障后更容易恢复。

考虑以下物化视图,它计算每小时最受欢迎的项目。

CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project

虽然我们可以添加目标表,但在添加物化视图之前,我们修改其 SELECT 子句以包含一个过滤器,该过滤器仅考虑大于近期未来某个时间的行 - 在这种情况下,我们假设 2024-12-17 09:00:00 比现在晚几分钟。

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) AS hour,
 project, count() AS count
FROM pypi WHERE timestamp >= '2024-12-17 09:00:00'
GROUP BY hour, project

添加此视图后,我们可以回填物化视图在此数据之前的所有数据。

最简单的方法是简单地运行从物化视图在主表上执行的查询,并使用一个过滤器来忽略最近添加的数据,将结果插入到我们的视图的目标表中,通过一个 INSERT INTO SELECT。例如,对于上面的视图

INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) AS hour,
 project,
    count() AS count
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
GROUP BY
    hour,
 project

Ok.

0 rows in set. Elapsed: 2.830 sec. Processed 798.89 million rows, 17.40 GB (282.28 million rows/s., 6.15 GB/s.)
Peak memory usage: 543.71 MiB.
注意

在上面的示例中,我们的目标表是一个 SummingMergeTree。在这种情况下,我们可以简单地使用我们原始的聚合查询。对于更复杂的用例,这些利用了 AggregatingMergeTree,您将使用 -State 函数进行聚合。可以在此 集成指南中找到一个示例。

在我们的例子中,这是一个相对轻量级的聚合,在 3 秒内完成,并使用少于 600MiB 的内存。对于更复杂或更长时间运行的聚合,您可以通过使用前面提到的重复表方法来提高此过程的弹性,即创建影子目标表,例如 pypi_downloads_per_day_v2,插入到此表中,并将结果分区附加到 pypi_downloads_per_day

通常物化视图的查询可能更复杂(这很常见,否则用户不会使用视图!),并且会消耗资源。在更罕见的情况下,查询的资源超出了服务器的承受能力。这突出了 ClickHouse 物化视图的一个优势 - 它们是增量的,不会一次处理整个数据集!

在这种情况下,用户有几种选择

  1. 修改您的查询以回填范围,例如 WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00 等。
  2. 使用 Null 表引擎 来填充物化视图。这复制了物化视图的典型增量填充,执行其查询处理数据块(大小可配置)。

(1) 代表最简单的方案通常就足够了。为了简洁,我们不包含示例。

我们将在下面进一步探讨 (2)。

使用 Null 表引擎填充物化视图

Null 表引擎 提供了一种不持久化数据的存储引擎(可以将其视为表引擎世界的 /dev/null)。虽然这似乎自相矛盾,但物化视图仍然会在插入到此表引擎中的数据上执行。这允许构建物化视图而无需持久化原始数据 - 避免 I/O 和相关的存储。

重要的是,附加到表引擎的任何物化视图仍然会处理插入的数据块 - 将其结果发送到目标表。这些块的大小是可配置的。虽然较大的块可能更有效(并且处理速度更快),但它们会消耗更多的资源(主要是内存)。使用此表引擎意味着我们可以增量地构建我们的物化视图,即一次一个块,避免将整个聚合保存在内存中。


考虑以下示例

CREATE TABLE pypi_v2
(
    `timestamp` DateTime,
    `project` String
)
ENGINE = Null

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv_v2 TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project

在这里,我们创建一个 Null 表 pypi_v2 来接收将用于构建我们的物化视图的行。请注意,我们如何将模式限制为仅需要的列。我们的物化视图对插入到此表中的行执行聚合(一次一个块),并将结果发送到我们的目标表 pypi_downloads_per_day

注意

我们在此处使用了 pypi_downloads_per_day 作为我们的目标表。为了增加弹性,用户可以创建一个重复表 pypi_downloads_per_day_v2,并将其用作视图的目标表,如前面的示例所示。在插入完成后,pypi_downloads_per_day_v2 中的分区可以反过来移动到 pypi_downloads_per_day。这将允许在我们的插入由于内存问题或服务器中断而失败的情况下进行恢复,即我们只需截断 pypi_downloads_per_day_v2,调整设置并重试。

要填充此物化视图,我们只需将相关数据从 pypi 插入到 pypi_v2 中即可。

INSERT INTO pypi_v2 SELECT timestamp, project FROM pypi WHERE timestamp < '2024-12-17 09:00:00'

0 rows in set. Elapsed: 27.325 sec. Processed 1.50 billion rows, 33.48 GB (54.73 million rows/s., 1.23 GB/s.)
Peak memory usage: 639.47 MiB.

请注意,我们这里的内存使用量为 639.47 MiB

调整性能和资源

几个因素将决定上述场景中的性能和资源使用情况。在尝试调整之前,我们建议读者了解插入机制,该机制在 使用线程进行读取 部分的 优化 S3 插入和读取性能指南 中有详细记录。 总结如下

  • 读取并行性 - 用于读取的线程数。通过 max_threads 控制。在 ClickHouse Cloud 中,这由实例大小决定,默认值为 vCPU 的数量。增加此值可能会提高读取性能,但会增加内存使用量。
  • 插入并行性 - 用于插入的插入线程数。通过 max_insert_threads 控制。 注意:此值受 max_threads 限制,因此有效的插入并行性为 min(max_insert_threads, max_threads)。在 ClickHouse Cloud 中,这由实例大小决定(在 2 到 4 之间),并且在 OSS 中设置为 1。增加此值可能会提高性能,但会增加内存使用量。
  • 插入块大小 - 数据在一个循环中处理,该循环将其提取、解析并形成基于 分区键 的内存插入块。这些块被排序、优化、压缩并写入存储作为新的 数据部分。插入块的大小由设置 min_insert_block_size_rowsmin_insert_block_size_bytes(未压缩)控制,会影响内存使用量和磁盘 I/O。较大的块使用更多的内存,但创建较少的部分,从而减少 I/O 和后台合并。这些设置表示最小阈值(达到任一阈值都会触发刷新)。
  • 物化视图块大小 - 除了主插入的上述机制外,在插入到物化视图之前,块也会被压缩以提高效率。这些块的大小由设置 min_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_views 确定。较大的块允许更高效的处理,但会消耗更多的内存。默认情况下,这些设置恢复为源表设置 min_insert_block_size_rowsmin_insert_block_size_bytes 的值,分别。
注意

简单 INSERT SELECT 查询的提示:对于没有复杂转换的简单 INSERT INTO t1 SELECT * FROM t2 查询,请考虑启用 optimize_trivial_insert_select=1。此设置(自版本 24.7 起默认禁用)会自动调整 SELECT 并行性以匹配 max_insert_threads,从而减少资源使用量和创建的部分数量。这对于在表之间进行批量数据迁移特别有用。

为了提高性能,您可以遵循 调整线程和块大小以进行插入 部分的 优化 S3 插入和读取性能指南 中概述的指南。通常,不需要修改 min_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_views 来提高性能。如果修改了这些设置,请使用与 min_insert_block_size_rowsmin_insert_block_size_bytes 讨论的最佳实践。

为了最小化内存,您可能希望尝试这些设置。这不可避免地会降低性能。使用前面的查询,我们下面显示示例。

max_insert_threads 降低到 1 可以减少我们的内存开销。

INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1

0 rows in set. Elapsed: 27.752 sec. Processed 1.50 billion rows, 33.48 GB (53.89 million rows/s., 1.21 GB/s.)
Peak memory usage: 506.78 MiB.

我们可以通过将我们的 max_threads 设置降低到 1 来进一步降低内存。

INSERT INTO pypi_v2
SELECT timestamp, project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1

Ok.

0 rows in set. Elapsed: 43.907 sec. Processed 1.50 billion rows, 33.48 GB (34.06 million rows/s., 762.54 MB/s.)
Peak memory usage: 272.53 MiB.

最后,我们可以通过将 min_insert_block_size_rows 设置为 0(禁用它作为块大小的决定因素)和 min_insert_block_size_bytes 设置为 10485760(10MiB)来进一步降低内存。

INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 10485760

0 rows in set. Elapsed: 43.293 sec. Processed 1.50 billion rows, 33.48 GB (34.54 million rows/s., 773.36 MB/s.)
Peak memory usage: 218.64 MiB.

最后,请注意,降低块大小会产生更多的部分并导致更大的合并压力。如 此处 讨论的那样,应谨慎更改这些设置。

没有时间戳或单调递增的列

上述过程依赖于用户拥有时间戳或单调递增的列。在某些情况下,这根本不可用。在这种情况下,我们建议以下过程,它利用了之前概述的许多步骤,但需要用户暂停数据摄取。

  1. 暂停向主表插入数据。
  2. 使用 CREATE AS 语法创建主目标表的副本。
  3. 使用 ALTER TABLE ATTACH 将原始目标表的的分区附加到副本。注意: 此附加操作与之前使用的移动操作不同。虽然依赖于硬链接,但原始表中的数据会被保留。
  4. 创建新的物化视图。
  5. 重新启动插入。注意: 插入只会更新目标表,而不会更新副本,副本只会引用原始数据。
  6. 回填物化视图,使用与之前用于具有时间戳的数据的相同过程,将副本表作为源。

考虑以下使用 PyPI 和我们之前的新物化视图 pypi_downloads_per_day 的示例(我们假设我们无法使用时间戳)

SELECT count() FROM pypi

┌────count()─┐
│ 2039988137 │ -- 2.04 billion
└────────────┘

1 row in set. Elapsed: 0.003 sec.

-- (1) Pause inserts
-- (2) Create a duplicate of our target table

CREATE TABLE pypi_v2 AS pypi

SELECT count() FROM pypi_v2

┌────count()─┐
│ 2039988137 │ -- 2.04 billion
└────────────┘

1 row in set. Elapsed: 0.004 sec.

-- (3) Attach partitions from the original target table to the duplicate.

ALTER TABLE pypi_v2
 (ATTACH PARTITION tuple() FROM pypi)

-- (4) Create our new materialized views

CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project

-- (4) Restart inserts. We replicate here by inserting a single row.

INSERT INTO pypi SELECT *
FROM pypi
LIMIT 1

SELECT count() FROM pypi

┌────count()─┐
│ 2039988138 │ -- 2.04 billion
└────────────┘

1 row in set. Elapsed: 0.003 sec.

-- notice how pypi_v2 contains same number of rows as before

SELECT count() FROM pypi_v2
┌────count()─┐
│ 2039988137 │ -- 2.04 billion
└────────────┘

-- (5) Backfill the view using the backup pypi_v2

INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project

0 rows in set. Elapsed: 3.719 sec. Processed 2.04 billion rows, 47.15 GB (548.57 million rows/s., 12.68 GB/s.)

DROP TABLE pypi_v2;

在倒数第二步中,我们使用之前描述的简单 INSERT INTO SELECT 方法回填 pypi_downloads_per_day 此处。 这也可以使用本文档中此处记录的空表方法进行增强,并可选择使用副本表以提高弹性。

虽然此操作需要暂停插入,但中间操作通常可以快速完成 - 从而最大限度地减少任何数据中断。

    © . This site is unofficial and not affiliated with ClickHouse, Inc.