跳至主要内容

集成 dbt 和 ClickHouse

dbt(数据构建工具)使分析工程师能够通过简单地编写 select 语句来转换仓库中的数据。dbt 处理将这些 select 语句具体化为数据库中的对象(以表和视图的形式) - 执行 提取、加载和转换 (ELT) 中的 T 部分。用户可以创建由 SELECT 语句定义的模型。

在 dbt 中,可以交叉引用和分层这些模型,以允许构建更高级别的概念。连接模型所需的样板 SQL 将自动生成。此外,dbt 会识别模型之间的依赖关系,并使用有向无环图 (DAG) 确保它们按适当的顺序创建。

Dbt 通过 ClickHouse 支持的插件 与 ClickHouse 兼容。我们描述了将 ClickHouse 与基于公开可用的 IMDB 数据集的简单示例连接的过程。我们还重点介绍了当前连接器的一些限制。

概念

dbt 引入了模型的概念。它被定义为 SQL 语句,可能连接许多表。模型可以通过多种方式“具体化”。具体化表示模型选择查询的构建策略。具体化背后的代码是样板 SQL,它将您的 SELECT 查询包装在一个语句中,以便创建一个新的或更新一个现有的关系。

dbt 提供 4 种类型的具体化

  • 视图(默认):模型在数据库中构建为视图。
  • :模型在数据库中构建为表。
  • 临时:模型不会直接在数据库中构建,而是作为公共表表达式提取到依赖模型中。
  • 增量:模型最初具体化为表,并在后续运行中,dbt 会插入新行并更新表中已更改的行。

其他语法和子句定义了如果其基础数据发生变化,这些模型应如何更新。dbt 通常建议从视图具体化开始,直到性能成为问题。表具体化通过将模型查询的结果作为表捕获来提高查询时间性能,但代价是增加了存储空间。增量方法在此基础上进一步构建,以允许捕获基础数据后续更新到目标表中。

ClickHouse 的 当前插件 支持 视图临时增量具体化。该插件还支持 dbt 快照种子,我们将在本指南中进行探讨。

对于以下指南,我们假设您已有一个 ClickHouse 实例可用。

dbt 和 ClickHouse 插件的设置

dbt

我们假设在以下示例中使用 dbt CLI。用户可能还想考虑 dbt Cloud,它提供了一个基于 Web 的集成开发环境 (IDE),允许用户编辑和运行项目。

dbt 提供了许多 CLI 安装选项。请按照此处描述的说明进行操作 此处。在此阶段,仅安装 dbt-core。我们建议使用 pip

pip install dbt-core

重要提示:以下内容已在 python 3.9 下测试。

ClickHouse 插件

安装 dbt ClickHouse 插件

pip install dbt-clickhouse

准备 ClickHouse

当对高度关系型数据进行建模时,dbt 非常出色。出于示例目的,我们提供了一个具有以下关系模式的小型 IMDB 数据集。此数据集源自 关系数据集存储库。这相对于 dbt 常用的模式来说是微不足道的,但它代表了一个可管理的样本

IMDB table schema

我们使用这些表的一个子集,如所示。

创建以下表

CREATE DATABASE imdb;

CREATE TABLE imdb.actors
(
id UInt32,
first_name String,
last_name String,
gender FixedString(1)
) ENGINE = MergeTree ORDER BY (id, first_name, last_name, gender);

CREATE TABLE imdb.directors
(
id UInt32,
first_name String,
last_name String
) ENGINE = MergeTree ORDER BY (id, first_name, last_name);

CREATE TABLE imdb.genres
(
movie_id UInt32,
genre String
) ENGINE = MergeTree ORDER BY (movie_id, genre);

CREATE TABLE imdb.movie_directors
(
director_id UInt32,
movie_id UInt64
) ENGINE = MergeTree ORDER BY (director_id, movie_id);

CREATE TABLE imdb.movies
(
id UInt32,
name String,
year UInt32,
rank Float32 DEFAULT 0
) ENGINE = MergeTree ORDER BY (id, name, year);

CREATE TABLE imdb.roles
(
actor_id UInt32,
movie_id UInt32,
role String,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree ORDER BY (actor_id, movie_id);
注意

roles 的列 created_at,默认为 now() 的值。我们稍后将使用它来识别模型的增量更新 - 请参阅 增量模型

我们使用 s3 函数从公共端点读取源数据以插入数据。运行以下命令填充表

INSERT INTO imdb.actors
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_actors.tsv.gz',
'TSVWithNames');

INSERT INTO imdb.directors
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_directors.tsv.gz',
'TSVWithNames');

INSERT INTO imdb.genres
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_movies_genres.tsv.gz',
'TSVWithNames');

INSERT INTO imdb.movie_directors
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_movies_directors.tsv.gz',
'TSVWithNames');

INSERT INTO imdb.movies
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_movies.tsv.gz',
'TSVWithNames');

INSERT INTO imdb.roles(actor_id, movie_id, role)
SELECT actor_id, movie_id, role
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/imdb/imdb_ijs_roles.tsv.gz',
'TSVWithNames');

这些命令的执行可能因您的带宽而异,但每个命令都只需要几秒钟即可完成。执行以下查询以计算每个演员的摘要,按电影出现次数排序,并确认数据已成功加载

SELECT id,
any(actor_name) as name,
uniqExact(movie_id) as num_movies,
avg(rank) as avg_rank,
uniqExact(genre) as unique_genres,
uniqExact(director_name) as uniq_directors,
max(created_at) as updated_at
FROM (
SELECT imdb.actors.id as id,
concat(imdb.actors.first_name, ' ', imdb.actors.last_name) as actor_name,
imdb.movies.id as movie_id,
imdb.movies.rank as rank,
genre,
concat(imdb.directors.first_name, ' ', imdb.directors.last_name) as director_name,
created_at
FROM imdb.actors
JOIN imdb.roles ON imdb.roles.actor_id = imdb.actors.id
LEFT OUTER JOIN imdb.movies ON imdb.movies.id = imdb.roles.movie_id
LEFT OUTER JOIN imdb.genres ON imdb.genres.movie_id = imdb.movies.id
LEFT OUTER JOIN imdb.movie_directors ON imdb.movie_directors.movie_id = imdb.movies.id
LEFT OUTER JOIN imdb.directors ON imdb.directors.id = imdb.movie_directors.director_id
)
GROUP BY id
ORDER BY num_movies DESC
LIMIT 5;

响应应如下所示

+------+------------+----------+------------------+-------------+--------------+-------------------+
|id |name |num_movies|avg_rank |unique_genres|uniq_directors|updated_at |
+------+------------+----------+------------------+-------------+--------------+-------------------+
|45332 |Mel Blanc |832 |6.175853582979779 |18 |84 |2022-04-26 14:01:45|
|621468|Bess Flowers|659 |5.57727638854796 |19 |293 |2022-04-26 14:01:46|
|372839|Lee Phelps |527 |5.032976449684617 |18 |261 |2022-04-26 14:01:46|
|283127|Tom London |525 |2.8721716524875673|17 |203 |2022-04-26 14:01:46|
|356804|Bud Osborne |515 |2.0389507108727773|15 |149 |2022-04-26 14:01:46|
+------+------------+----------+------------------+-------------+--------------+-------------------+

在后面的指南中,我们将把此查询转换为模型 - 在 ClickHouse 中将其具体化为 dbt 视图和表。

连接到 ClickHouse

  1. 创建一个 dbt 项目。在本例中,我们以我们的 imdb 源命名。出现提示时,选择 clickhouse 作为数据库源。

    clickhouse-user@clickhouse:~$ dbt init imdb

    16:52:40 Running with dbt=1.1.0
    Which database would you like to use?
    [1] clickhouse

    (Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

    Enter a number: 1
    16:53:21 No sample profile found for clickhouse.
    16:53:21
    Your new dbt project "imdb" was created!

    For more information on how to configure the profiles.yml file,
    please consult the dbt documentation here:

    https://docs.getdbt.com/docs/configure-your-profile
  2. cd 到您的项目文件夹

    cd imdb
  3. 此时,您将需要您选择的文本编辑器。在下面的示例中,我们使用流行的 VSCode。打开 IMDB 目录,您应该会看到一系列 yml 和 sql 文件

    New dbt project
  4. 更新您的 dbt_project.yml 文件以指定我们的第一个模型 - actor_summary 并将配置文件设置为 clickhouse_imdb

    dbt profiledbt profile
  5. 接下来,我们需要向 dbt 提供 ClickHouse 实例的连接详细信息。将以下内容添加到您的 ~/.dbt/profiles.yml 中。

    clickhouse_imdb:
    target: dev
    outputs:
    dev:
    type: clickhouse
    schema: imdb_dbt
    host: localhost
    port: 8123
    user: default
    password: ''
    secure: False

    请注意需要修改用户和密码。此处记录了其他可用设置 此处

  6. 从 IMDB 目录中,执行 dbt debug 命令以确认 dbt 是否能够连接到 ClickHouse。

    clickhouse-user@clickhouse:~/imdb$ dbt debug
    17:33:53 Running with dbt=1.1.0
    dbt version: 1.1.0
    python version: 3.10.1
    python path: /home/dale/.pyenv/versions/3.10.1/bin/python3.10
    os info: Linux-5.13.0-10039-tuxedo-x86_64-with-glibc2.31
    Using profiles.yml file at /home/dale/.dbt/profiles.yml
    Using dbt_project.yml file at /opt/dbt/imdb/dbt_project.yml

    Configuration:
    profiles.yml file [OK found and valid]
    dbt_project.yml file [OK found and valid]

    Required dependencies:
    - git [OK found]

    Connection:
    host: localhost
    port: 8123
    user: default
    schema: imdb_dbt
    secure: False
    verify: False
    Connection test: [OK connection ok]

    All checks passed!

    确认响应包含 连接测试:[OK connection ok],表示连接成功。

创建简单的视图具体化

使用视图具体化时,模型在每次运行时都会作为视图重新构建,方法是在 ClickHouse 中使用 CREATE VIEW AS 语句。这不需要任何额外的存储数据,但查询速度会比表具体化慢。

  1. imdb 文件夹中,删除目录 models/example

    clickhouse-user@clickhouse:~/imdb$ rm -rf models/example
  2. models 文件夹内的 actors 中创建一个新文件。在这里,我们创建每个表示演员模型的文件

    clickhouse-user@clickhouse:~/imdb$ mkdir models/actors
  3. models/actors 文件夹中创建文件 schema.ymlactor_summary.sql

    clickhouse-user@clickhouse:~/imdb$ touch models/actors/actor_summary.sql
    clickhouse-user@clickhouse:~/imdb$ touch models/actors/schema.yml

    文件 schema.yml 定义了我们的表。这些随后将可用于宏中。编辑 models/actors/schema.yml 以包含此内容

    version: 2

    sources:
    - name: imdb
    tables:
    - name: directors
    - name: actors
    - name: roles
    - name: movies
    - name: genres
    - name: movie_directors

    actors_summary.sql 定义了我们实际的模型。请注意,在 config 函数中,我们还请求模型在 ClickHouse 中具体化为视图。我们的表通过函数 sourceschema.yml 文件中引用,例如 source('imdb', 'movies') 指的是 imdb 数据库中的 movies 表。编辑 models/actors/actors_summary.sql 以包含此内容

    {{ config(materialized='view') }}

    with actor_summary as (
    SELECT id,
    any(actor_name) as name,
    uniqExact(movie_id) as num_movies,
    avg(rank) as avg_rank,
    uniqExact(genre) as genres,
    uniqExact(director_name) as directors,
    max(created_at) as updated_at
    FROM (
    SELECT {{ source('imdb', 'actors') }}.id as id,
    concat({{ source('imdb', 'actors') }}.first_name, ' ', {{ source('imdb', 'actors') }}.last_name) as actor_name,
    {{ source('imdb', 'movies') }}.id as movie_id,
    {{ source('imdb', 'movies') }}.rank as rank,
    genre,
    concat({{ source('imdb', 'directors') }}.first_name, ' ', {{ source('imdb', 'directors') }}.last_name) as director_name,
    created_at
    FROM {{ source('imdb', 'actors') }}
    JOIN {{ source('imdb', 'roles') }} ON {{ source('imdb', 'roles') }}.actor_id = {{ source('imdb', 'actors') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'movies') }} ON {{ source('imdb', 'movies') }}.id = {{ source('imdb', 'roles') }}.movie_id
    LEFT OUTER JOIN {{ source('imdb', 'genres') }} ON {{ source('imdb', 'genres') }}.movie_id = {{ source('imdb', 'movies') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'movie_directors') }} ON {{ source('imdb', 'movie_directors') }}.movie_id = {{ source('imdb', 'movies') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'directors') }} ON {{ source('imdb', 'directors') }}.id = {{ source('imdb', 'movie_directors') }}.director_id
    )
    GROUP BY id
    )

    select *
    from actor_summary

    请注意,我们在最终的 actor_summary 中包含了 updated_at 列。我们稍后将使用它进行增量具体化。

  4. imdb 目录执行命令 dbt run

    clickhouse-user@clickhouse:~/imdb$ dbt run
    15:05:35 Running with dbt=1.1.0
    15:05:35 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    15:05:35
    15:05:36 Concurrency: 1 threads (target='dev')
    15:05:36
    15:05:36 1 of 1 START view model imdb_dbt.actor_summary.................................. [RUN]
    15:05:37 1 of 1 OK created view model imdb_dbt.actor_summary............................. [OK in 1.00s]
    15:05:37
    15:05:37 Finished running 1 view model in 1.97s.
    15:05:37
    15:05:37 Completed successfully
    15:05:37
    15:05:37 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  5. dbt 将模型表示为 ClickHouse 中的视图(如请求的那样)。我们现在可以直接查询此视图。此视图将在 imdb_dbt 数据库中创建 - 这由 ~/.dbt/profiles.yml 文件中 clickhouse_imdb 配置文件下的 schema 参数确定。

    SHOW DATABASES;
    +------------------+
    |name |
    +------------------+
    |INFORMATION_SCHEMA|
    |default |
    |imdb |
    |imdb_dbt | <---created by dbt!
    |information_schema|
    |system |
    +------------------+

    查询此视图,我们可以使用更简单的语法复制我们之前查询的结果

    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 5;
    +------+------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+------------+----------+------------------+------+---------+-------------------+
    |45332 |Mel Blanc |832 |6.175853582979779 |18 |84 |2022-04-26 15:26:55|
    |621468|Bess Flowers|659 |5.57727638854796 |19 |293 |2022-04-26 15:26:57|
    |372839|Lee Phelps |527 |5.032976449684617 |18 |261 |2022-04-26 15:26:56|
    |283127|Tom London |525 |2.8721716524875673|17 |203 |2022-04-26 15:26:56|
    |356804|Bud Osborne |515 |2.0389507108727773|15 |149 |2022-04-26 15:26:56|
    +------+------------+----------+------------------+------+---------+-------------------+

创建表具体化

在之前的示例中,我们的模型被物化为了一个视图。虽然这对于某些查询可能提供了足够的性能,但更复杂的 SELECT 或频繁执行的查询可能更适合被物化成一个表。这种物化对于将被 BI 工具查询的模型很有用,以确保用户获得更快的体验。这实际上会导致查询结果被存储为一个新表,并伴随相关的存储开销 - 实际上,执行了一个 INSERT TO SELECT 操作。请注意,此表将在每次执行时重新构建,即它不是增量的。因此,大型结果集可能会导致执行时间过长 - 请参阅 dbt 限制

  1. 修改文件 actors_summary.sql,使 materialized 参数设置为 table。注意 ORDER BY 的定义方式,并注意我们使用了 MergeTree 表引擎。

    {{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='table') }}
  2. imdb 目录执行命令 dbt run。此执行可能需要稍长的时间 - 在大多数机器上大约需要 10 秒。

    clickhouse-user@clickhouse:~/imdb$ dbt run
    15:13:27 Running with dbt=1.1.0
    15:13:27 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    15:13:27
    15:13:28 Concurrency: 1 threads (target='dev')
    15:13:28
    15:13:28 1 of 1 START table model imdb_dbt.actor_summary................................. [RUN]
    15:13:37 1 of 1 OK created table model imdb_dbt.actor_summary............................ [OK in 9.22s]
    15:13:37
    15:13:37 Finished running 1 table model in 10.20s.
    15:13:37
    15:13:37 Completed successfully
    15:13:37
    15:13:37 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  3. 确认表 imdb_dbt.actor_summary 的创建。

    SHOW CREATE TABLE imdb_dbt.actor_summary;

    您应该看到表具有相应的 数据类型。

    +----------------------------------------
    |statement
    +----------------------------------------
    |CREATE TABLE imdb_dbt.actor_summary
    |(
    |`id` UInt32,
    |`first_name` String,
    |`last_name` String,
    |`num_movies` UInt64,
    |`updated_at` DateTime
    |)
    |ENGINE = MergeTree
    |ORDER BY (id, first_name, last_name)
    |SETTINGS index_granularity = 8192
    +----------------------------------------
  4. 确认此表的结果与之前的响应一致。注意,现在模型是一个表,响应时间有了明显的改善。

    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 5;
    +------+------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+------------+----------+------------------+------+---------+-------------------+
    |45332 |Mel Blanc |832 |6.175853582979779 |18 |84 |2022-04-26 15:26:55|
    |621468|Bess Flowers|659 |5.57727638854796 |19 |293 |2022-04-26 15:26:57|
    |372839|Lee Phelps |527 |5.032976449684617 |18 |261 |2022-04-26 15:26:56|
    |283127|Tom London |525 |2.8721716524875673|17 |203 |2022-04-26 15:26:56|
    |356804|Bud Osborne |515 |2.0389507108727773|15 |149 |2022-04-26 15:26:56|
    +------+------------+----------+------------------+------+---------+-------------------+

    可以随意对该模型发出其他查询。例如,哪些演员出演了排名最高的电影,并且出演次数超过 5 次?

    SELECT * FROM imdb_dbt.actor_summary WHERE num_movies > 5 ORDER BY avg_rank  DESC LIMIT 10;

创建增量物化

前面的示例创建了一个表来物化模型。此表将在每次 dbt 执行时重新构建。对于较大的结果集或复杂的转换,这可能是不可行的并且代价高昂。为了解决这一挑战并减少构建时间,dbt 提供了增量物化。这允许 dbt 自上次执行以来将记录插入或更新到表中,使其适用于事件类型的数据。在底层,会创建一个包含所有更新记录的临时表,然后将所有未修改的记录以及更新的记录插入到一个新的目标表中。这导致与表模型类似的 限制,适用于大型结果集。

为了克服大型数据集的这些限制,插件支持“inserts_only”模式,其中所有更新都插入到目标表中,而无需创建临时表(更多信息见下文)。

为了说明此示例,我们将添加演员“Clicky McClickHouse”,他将在 910 部电影中出现 - 确保他出演的电影比甚至 梅尔·布兰克 还要多。

  1. 首先,我们将模型修改为增量类型。此添加需要

    1. 唯一键 - 为了确保插件能够唯一标识行,我们必须提供一个唯一键 - 在这种情况下,查询中的 id 字段就足够了。这确保了我们的物化表中不会有重复的行。有关唯一性约束的更多详细信息,请参阅 此处
    2. 增量过滤器 - 我们还需要告诉 dbt 如何识别增量运行中哪些行发生了更改。这是通过提供一个增量表达式来实现的。通常这涉及事件数据的 timestamp;因此,我们的 updated_at timestamp 字段。此列在插入行时默认为 now() 的值,允许识别新角色。此外,我们需要识别添加新演员的替代情况。使用 {{this}} 变量来表示现有的物化表,这给了我们表达式 where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})。我们将它嵌入到 {% if is_incremental() %} 条件中,确保它只在增量运行时使用,而不是在第一次构建表时使用。有关为增量模型过滤行的更多详细信息,请参阅 dbt 文档中的此讨论

    按如下方式更新文件 actor_summary.sql

    {{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='incremental', unique_key='id') }}
    with actor_summary as (
    SELECT id,
    any(actor_name) as name,
    uniqExact(movie_id) as num_movies,
    avg(rank) as avg_rank,
    uniqExact(genre) as genres,
    uniqExact(director_name) as directors,
    max(created_at) as updated_at
    FROM (
    SELECT {{ source('imdb', 'actors') }}.id as id,
    concat({{ source('imdb', 'actors') }}.first_name, ' ', {{ source('imdb', 'actors') }}.last_name) as actor_name,
    {{ source('imdb', 'movies') }}.id as movie_id,
    {{ source('imdb', 'movies') }}.rank as rank,
    genre,
    concat({{ source('imdb', 'directors') }}.first_name, ' ', {{ source('imdb', 'directors') }}.last_name) as director_name,
    created_at
    FROM {{ source('imdb', 'actors') }}
    JOIN {{ source('imdb', 'roles') }} ON {{ source('imdb', 'roles') }}.actor_id = {{ source('imdb', 'actors') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'movies') }} ON {{ source('imdb', 'movies') }}.id = {{ source('imdb', 'roles') }}.movie_id
    LEFT OUTER JOIN {{ source('imdb', 'genres') }} ON {{ source('imdb', 'genres') }}.movie_id = {{ source('imdb', 'movies') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'movie_directors') }} ON {{ source('imdb', 'movie_directors') }}.movie_id = {{ source('imdb', 'movies') }}.id
    LEFT OUTER JOIN {{ source('imdb', 'directors') }} ON {{ source('imdb', 'directors') }}.id = {{ source('imdb', 'movie_directors') }}.director_id
    )
    GROUP BY id
    )
    select *
    from actor_summary

    {% if is_incremental() %}

    -- this filter will only be applied on an incremental run
    where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})

    {% endif %}

    请注意,我们的模型只会响应 rolesactors 表的更新和添加。为了响应所有表,建议用户将此模型拆分为多个子模型 - 每个模型都有自己的增量条件。然后可以引用和连接这些模型。有关交叉引用模型的更多详细信息,请参阅 此处

  2. 执行 dbt run 并确认结果表的内容。

    clickhouse-user@clickhouse:~/imdb$  dbt run
    15:33:34 Running with dbt=1.1.0
    15:33:34 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    15:33:34
    15:33:35 Concurrency: 1 threads (target='dev')
    15:33:35
    15:33:35 1 of 1 START incremental model imdb_dbt.actor_summary........................... [RUN]
    15:33:41 1 of 1 OK created incremental model imdb_dbt.actor_summary...................... [OK in 6.33s]
    15:33:41
    15:33:41 Finished running 1 incremental model in 7.30s.
    15:33:41
    15:33:41 Completed successfully
    15:33:41
    15:33:41 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 5;
    +------+------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+------------+----------+------------------+------+---------+-------------------+
    |45332 |Mel Blanc |832 |6.175853582979779 |18 |84 |2022-04-26 15:26:55|
    |621468|Bess Flowers|659 |5.57727638854796 |19 |293 |2022-04-26 15:26:57|
    |372839|Lee Phelps |527 |5.032976449684617 |18 |261 |2022-04-26 15:26:56|
    |283127|Tom London |525 |2.8721716524875673|17 |203 |2022-04-26 15:26:56|
    |356804|Bud Osborne |515 |2.0389507108727773|15 |149 |2022-04-26 15:26:56|
    +------+------------+----------+------------------+------+---------+-------------------+
  3. 现在,我们将向我们的模型添加数据以说明增量更新。将我们的演员“Clicky McClickHouse”添加到 actors 表中。

    INSERT INTO imdb.actors VALUES (845466, 'Clicky', 'McClickHouse', 'M');
  4. 让 Clicky 出演 910 部随机电影。

    INSERT INTO imdb.roles
    SELECT now() as created_at, 845466 as actor_id, id as movie_id, 'Himself' as role
    FROM imdb.movies
    LIMIT 910 OFFSET 10000;
  5. 通过查询底层源表并绕过任何 dbt 模型,确认他确实是现在出演次数最多的演员。

    SELECT id,
    any(actor_name) as name,
    uniqExact(movie_id) as num_movies,
    avg(rank) as avg_rank,
    uniqExact(genre) as unique_genres,
    uniqExact(director_name) as uniq_directors,
    max(created_at) as updated_at
    FROM (
    SELECT imdb.actors.id as id,
    concat(imdb.actors.first_name, ' ', imdb.actors.last_name) as actor_name,
    imdb.movies.id as movie_id,
    imdb.movies.rank as rank,
    genre,
    concat(imdb.directors.first_name, ' ', imdb.directors.last_name) as director_name,
    created_at
    FROM imdb.actors
    JOIN imdb.roles ON imdb.roles.actor_id = imdb.actors.id
    LEFT OUTER JOIN imdb.movies ON imdb.movies.id = imdb.roles.movie_id
    LEFT OUTER JOIN imdb.genres ON imdb.genres.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.movie_directors ON imdb.movie_directors.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.directors ON imdb.directors.id = imdb.movie_directors.director_id
    )
    GROUP BY id
    ORDER BY num_movies DESC
    LIMIT 2;
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |845466|Clicky McClickHouse|910 |1.4687938697032283|21 |662 |2022-04-26 16:20:36|
    |45332 |Mel Blanc |909 |5.7884792542982515|19 |148 |2022-04-26 16:17:42|
    +------+-------------------+----------+------------------+------+---------+-------------------+
  6. 执行 dbt run 并确认我们的模型已更新并与上述结果匹配。

    clickhouse-user@clickhouse:~/imdb$  dbt run
    16:12:16 Running with dbt=1.1.0
    16:12:16 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    16:12:16
    16:12:17 Concurrency: 1 threads (target='dev')
    16:12:17
    16:12:17 1 of 1 START incremental model imdb_dbt.actor_summary........................... [RUN]
    16:12:24 1 of 1 OK created incremental model imdb_dbt.actor_summary...................... [OK in 6.82s]
    16:12:24
    16:12:24 Finished running 1 incremental model in 7.79s.
    16:12:24
    16:12:24 Completed successfully
    16:12:24
    16:12:24 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 2;
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |845466|Clicky McClickHouse|910 |1.4687938697032283|21 |662 |2022-04-26 16:20:36|
    |45332 |Mel Blanc |909 |5.7884792542982515|19 |148 |2022-04-26 16:17:42|
    +------+-------------------+----------+------------------+------+---------+-------------------+

内部机制

我们可以通过查询 ClickHouse 的查询日志来识别执行上述增量更新的语句。

SELECT event_time, query  FROM system.query_log WHERE type='QueryStart' AND query LIKE '%dbt%'
AND event_time > subtractMinutes(now(), 15) ORDER BY event_time LIMIT 100;

将上述查询调整到执行期间。我们将结果检查留给用户,但重点介绍插件用于执行增量更新的总体策略。

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。已更改的行将流式传输到此表中。
  2. 创建一个新表 actor_summary_new。然后,旧表中的行从旧表流式传输到新表,并检查以确保行 ID 不存在于临时表中。这有效地处理了更新和重复项。
  3. 临时表的结果流式传输到新的 actor_summary 表中。
  4. 最后,通过 EXCHANGE TABLES 语句将新表与旧版本原子地交换。然后删除旧表和临时表。

这在下图中进行了可视化。

incremental updates dbt

此策略在非常大的模型上可能会遇到挑战。有关更多详细信息,请参阅 限制

追加策略(inserts-only 模式)

为了克服增量模型中大型数据集的限制,插件使用 dbt 配置参数 incremental_strategy。这可以设置为值 append。设置后,更新的行将直接插入到目标表(又名 imdb_dbt.actor_summary)中,并且不会创建临时表。注意:仅追加模式要求您的数据是不可变的,或者允许重复项。如果您希望增量表模型支持更改的行,请不要使用此模式!

为了说明此模式,我们将添加另一个新演员并使用 incremental_strategy='append' 重新执行 dbt run。

  1. 在 actor_summary.sql 中配置仅追加模式。

    {{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='incremental', unique_key='id', incremental_strategy='append') }}
  2. 让我们添加另一位著名演员 - 丹尼·德维托。

    INSERT INTO imdb.actors VALUES (845467, 'Danny', 'DeBito', 'M');
  3. 让丹尼出演 920 部随机电影。

    INSERT INTO imdb.roles
    SELECT now() as created_at, 845467 as actor_id, id as movie_id, 'Himself' as role
    FROM imdb.movies
    LIMIT 920 OFFSET 10000;
  4. 执行 dbt run 并确认丹尼已添加到 actor-summary 表中。

    clickhouse-user@clickhouse:~/imdb$ dbt run
    16:12:16 Running with dbt=1.1.0
    16:12:16 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 186 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    16:12:16
    16:12:17 Concurrency: 1 threads (target='dev')
    16:12:17
    16:12:17 1 of 1 START incremental model imdb_dbt.actor_summary........................... [RUN]
    16:12:24 1 of 1 OK created incremental model imdb_dbt.actor_summary...................... [OK in 0.17s]
    16:12:24
    16:12:24 Finished running 1 incremental model in 0.19s.
    16:12:24
    16:12:24 Completed successfully
    16:12:24
    16:12:24 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 3;
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |845467|Danny DeBito |920 |1.4768987303293204|21 |670 |2022-04-26 16:22:06|
    |845466|Clicky McClickHouse|910 |1.4687938697032283|21 |662 |2022-04-26 16:20:36|
    |45332 |Mel Blanc |909 |5.7884792542982515|19 |148 |2022-04-26 16:17:42|
    +------+-------------------+----------+------------------+------+---------+-------------------+

注意与 Clicky 的插入相比,增量速度有多快。

再次检查 query_log 表,揭示了 2 次增量运行之间的差异。

insert into imdb_dbt.actor_summary ("id", "name", "num_movies", "avg_rank", "genres", "directors", "updated_at")
with actor_summary as (
SELECT id,
any(actor_name) as name,
uniqExact(movie_id) as num_movies,
avg(rank) as avg_rank,
uniqExact(genre) as genres,
uniqExact(director_name) as directors,
max(created_at) as updated_at
FROM (
SELECT imdb.actors.id as id,
concat(imdb.actors.first_name, ' ', imdb.actors.last_name) as actor_name,
imdb.movies.id as movie_id,
imdb.movies.rank as rank,
genre,
concat(imdb.directors.first_name, ' ', imdb.directors.last_name) as director_name,
created_at
FROM imdb.actors
JOIN imdb.roles ON imdb.roles.actor_id = imdb.actors.id
LEFT OUTER JOIN imdb.movies ON imdb.movies.id = imdb.roles.movie_id
LEFT OUTER JOIN imdb.genres ON imdb.genres.movie_id = imdb.movies.id
LEFT OUTER JOIN imdb.movie_directors ON imdb.movie_directors.movie_id = imdb.movies.id
LEFT OUTER JOIN imdb.directors ON imdb.directors.id = imdb.movie_directors.director_id
)
GROUP BY id
)

select *
from actor_summary
-- this filter will only be applied on an incremental run
where id > (select max(id) from imdb_dbt.actor_summary) or updated_at > (select max(updated_at) from imdb_dbt.actor_summary)

在此运行中,只有新行直接添加到 imdb_dbt.actor_summary 表中,并且不涉及表创建。

删除+插入模式(实验性)

从历史上看,ClickHouse 对更新和删除的支持有限,以异步 Mutations 的形式出现。这些操作可能非常 I/O 密集,应尽量避免。

ClickHouse 22.8 引入了 轻量级删除。这些目前处于实验阶段,但提供了更有效的删除数据方法。

可以通过 incremental_strategy 参数为模型配置此模式,即

{{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='incremental', unique_key='id', incremental_strategy='delete+insert') }}

此策略直接作用于目标模型的表,因此,如果操作过程中出现问题,增量模型中的数据可能处于无效状态 - 没有原子更新。

总而言之,这种方法

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。已更改的行将流式传输到此表中。
  2. 对当前 actor_summary 表发出 DELETE 命令。从 actor_sumary__dbt_tmp 中按 id 删除行。
  3. 使用 INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmpactor_sumary__dbt_tmp 中的行插入到 actor_summary 中。

此过程如下图所示。

lightweight delete incremental

insert_overwrite 模式(实验性)

执行以下步骤

  1. 创建一个与增量模型关系具有相同结构的暂存(临时)表:CREATE TABLE {staging} AS {target}。
  2. 仅将新记录(由 SELECT 生成)插入到暂存表中。
  3. 仅将暂存表中存在的新分区替换到目标表中。

这种方法具有以下优点

  • 它比默认策略更快,因为它不会复制整个表。
  • 它比其他策略更安全,因为它在 INSERT 操作成功完成之前不会修改原始表:如果中间发生故障,则不会修改原始表。
  • 它实现了“分区不变性”数据工程最佳实践。这简化了增量和并行数据处理、回滚等。
注意

insert_overwrite 功能尚未在多节点设置上进行测试。

有关此功能实现的详细信息,请查看引入它的 PR

创建快照

dbt 快照允许随着时间的推移记录对可变模型的更改。这反过来允许对模型进行时间点查询,分析师可以在其中“回顾过去”查看模型的先前状态。这是使用 2 型缓慢变化维度 实现的,其中 from 和 to 日期列记录行有效的时间。ClickHouse 插件支持此功能,并在下面进行了演示。

此示例假设您已完成 创建增量表模型。确保您的 actor_summary.sql 未设置 inserts_only=True。您的 models/actor_summary.sql 应如下所示。

{{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='incremental', unique_key='id') }}

with actor_summary as (
SELECT id,
any(actor_name) as name,
uniqExact(movie_id) as num_movies,
avg(rank) as avg_rank,
uniqExact(genre) as genres,
uniqExact(director_name) as directors,
max(created_at) as updated_at
FROM (
SELECT {{ source('imdb', 'actors') }}.id as id,
concat({{ source('imdb', 'actors') }}.first_name, ' ', {{ source('imdb', 'actors') }}.last_name) as actor_name,
{{ source('imdb', 'movies') }}.id as movie_id,
{{ source('imdb', 'movies') }}.rank as rank,
genre,
concat({{ source('imdb', 'directors') }}.first_name, ' ', {{ source('imdb', 'directors') }}.last_name) as director_name,
created_at
FROM {{ source('imdb', 'actors') }}
JOIN {{ source('imdb', 'roles') }} ON {{ source('imdb', 'roles') }}.actor_id = {{ source('imdb', 'actors') }}.id
LEFT OUTER JOIN {{ source('imdb', 'movies') }} ON {{ source('imdb', 'movies') }}.id = {{ source('imdb', 'roles') }}.movie_id
LEFT OUTER JOIN {{ source('imdb', 'genres') }} ON {{ source('imdb', 'genres') }}.movie_id = {{ source('imdb', 'movies') }}.id
LEFT OUTER JOIN {{ source('imdb', 'movie_directors') }} ON {{ source('imdb', 'movie_directors') }}.movie_id = {{ source('imdb', 'movies') }}.id
LEFT OUTER JOIN {{ source('imdb', 'directors') }} ON {{ source('imdb', 'directors') }}.id = {{ source('imdb', 'movie_directors') }}.director_id
)
GROUP BY id
)
select *
from actor_summary

{% if is_incremental() %}

-- this filter will only be applied on an incremental run
where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})

{% endif %}
  1. 在 snapshots 目录中创建一个文件 actor_summary

     touch snapshots/actor_summary.sql
  2. 使用以下内容更新 actor_summary.sql 文件的内容。

    {% snapshot actor_summary_snapshot %}

    {{
    config(
    target_schema='snapshots',
    unique_key='id',
    strategy='timestamp',
    updated_at='updated_at',
    )
    }}

    select * from {{ref('actor_summary')}}

    {% endsnapshot %}

关于此内容的一些观察结果

  • select 查询定义了您希望随时间推移快照的结果。函数 ref 用于引用我们之前创建的 actor_summary 模型。

  • 我们需要一个时间戳列来指示记录的更改。我们的 updated_at 列(参见 创建增量表模型)可以在这里使用。参数 strategy 指示我们使用时间戳来表示更新,参数 updated_at 指定要使用的列。如果您的模型中不存在此列,您可以选择使用 check 策略。这种方法效率要低得多,并且要求用户指定要比较的一系列列。dbt 会比较这些列的当前值和历史值,记录任何更改(如果相同则不做任何操作)。
  1. 运行命令 dbt snapshot

    clickhouse-user@clickhouse:~/imdb$ dbt snapshot
    13:26:23 Running with dbt=1.1.0
    13:26:23 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
    13:26:23
    13:26:25 Concurrency: 1 threads (target='dev')
    13:26:25
    13:26:25 1 of 1 START snapshot snapshots.actor_summary_snapshot...................... [RUN]
    13:26:25 1 of 1 OK snapshotted snapshots.actor_summary_snapshot...................... [OK in 0.79s]
    13:26:25
    13:26:25 Finished running 1 snapshot in 2.11s.
    13:26:25
    13:26:25 Completed successfully
    13:26:25
    13:26:25 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

请注意,在 snapshots 数据库(由 target_schema 参数确定)中创建了一个名为 actor_summary_snapshot 的表。

  1. 对这些数据进行采样,您会看到 dbt 如何包含了 dbt_valid_from 和 dbt_valid_to 列。后者设置为 null 值。后续运行将更新此值。

    SELECT id, name, num_movies, dbt_valid_from, dbt_valid_to FROM snapshots.actor_summary_snapshot ORDER BY num_movies DESC LIMIT 5;
    +------+----------+------------+----------+-------------------+------------+
    |id |first_name|last_name |num_movies|dbt_valid_from |dbt_valid_to|
    +------+----------+------------+----------+-------------------+------------+
    |845467|Danny |DeBito |920 |2022-05-25 19:33:32|NULL |
    |845466|Clicky |McClickHouse|910 |2022-05-25 19:32:34|NULL |
    |45332 |Mel |Blanc |909 |2022-05-25 19:31:47|NULL |
    |621468|Bess |Flowers |672 |2022-05-25 19:31:47|NULL |
    |283127|Tom |London |549 |2022-05-25 19:31:47|NULL |
    +------+----------+------------+----------+-------------------+------------+
  2. 使我们最喜欢的演员 Clicky McClickHouse 再出演 10 部电影。

    INSERT INTO imdb.roles
    SELECT now() as created_at, 845466 as actor_id, rand(number) % 412320 as movie_id, 'Himself' as role
    FROM system.numbers
    LIMIT 10;
  3. 从 imdb 目录重新运行 dbt run 命令。这将更新增量模型。完成后,运行 dbt snapshot 以捕获更改。

    clickhouse-user@clickhouse:~/imdb$ dbt run
    13:46:14 Running with dbt=1.1.0
    13:46:14 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
    13:46:14
    13:46:15 Concurrency: 1 threads (target='dev')
    13:46:15
    13:46:15 1 of 1 START incremental model imdb_dbt.actor_summary....................... [RUN]
    13:46:18 1 of 1 OK created incremental model imdb_dbt.actor_summary.................. [OK in 2.76s]
    13:46:18
    13:46:18 Finished running 1 incremental model in 3.73s.
    13:46:18
    13:46:18 Completed successfully
    13:46:18
    13:46:18 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

    clickhouse-user@clickhouse:~/imdb$ dbt snapshot
    13:46:26 Running with dbt=1.1.0
    13:46:26 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
    13:46:26
    13:46:27 Concurrency: 1 threads (target='dev')
    13:46:27
    13:46:27 1 of 1 START snapshot snapshots.actor_summary_snapshot...................... [RUN]
    13:46:31 1 of 1 OK snapshotted snapshots.actor_summary_snapshot...................... [OK in 4.05s]
    13:46:31
    13:46:31 Finished running 1 snapshot in 5.02s.
    13:46:31
    13:46:31 Completed successfully
    13:46:31
    13:46:31 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  4. 如果我们现在查询快照,会注意到 Clicky McClickHouse 有两行。我们之前的条目现在具有 dbt_valid_to 值。我们的新值在 dbt_valid_from 列中记录了相同的值,并且 dbt_valid_to 值为 null。如果我们确实有新行,这些行也将追加到快照中。

    SELECT id, name, num_movies, dbt_valid_from, dbt_valid_to FROM snapshots.actor_summary_snapshot ORDER BY num_movies DESC LIMIT 5;
    +------+----------+------------+----------+-------------------+-------------------+
    |id |first_name|last_name |num_movies|dbt_valid_from |dbt_valid_to |
    +------+----------+------------+----------+-------------------+-------------------+
    |845467|Danny |DeBito |920 |2022-05-25 19:33:32|NULL |
    |845466|Clicky |McClickHouse|920 |2022-05-25 19:34:37|NULL |
    |845466|Clicky |McClickHouse|910 |2022-05-25 19:32:34|2022-05-25 19:34:37|
    |45332 |Mel |Blanc |909 |2022-05-25 19:31:47|NULL |
    |621468|Bess |Flowers |672 |2022-05-25 19:31:47|NULL |
    +------+----------+------------+----------+-------------------+-------------------+

有关 dbt 快照的更多详细信息,请参见 此处

使用 Seeds

dbt 提供了从 CSV 文件加载数据的能力。此功能不适用于加载数据库的大量导出,而是更适用于通常用于代码表和 字典的小文件,例如将国家代码映射到国家名称。举一个简单的例子,我们使用 seed 功能生成并上传一个电影类型代码列表。

  1. 我们从现有数据集中生成一个电影类型代码列表。在 dbt 目录中,使用 clickhouse-client 创建一个名为 seeds/genre_codes.csv 的文件。

    clickhouse-user@clickhouse:~/imdb$ clickhouse-client --password <password> --query
    "SELECT genre, ucase(substring(genre, 1, 3)) as code FROM imdb.genres GROUP BY genre
    LIMIT 100 FORMAT CSVWithNames" > seeds/genre_codes.csv
  2. 执行 dbt seed 命令。这将在我们的数据库 imdb_dbt(由我们的模式配置定义)中创建一个名为 genre_codes 的新表,其中包含来自 csv 文件的行。

    clickhouse-user@clickhouse:~/imdb$ dbt seed
    17:03:23 Running with dbt=1.1.0
    17:03:23 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 1 seed file, 6 sources, 0 exposures, 0 metrics
    17:03:23
    17:03:24 Concurrency: 1 threads (target='dev')
    17:03:24
    17:03:24 1 of 1 START seed file imdb_dbt.genre_codes..................................... [RUN]
    17:03:24 1 of 1 OK loaded seed file imdb_dbt.genre_codes................................. [INSERT 21 in 0.65s]
    17:03:24
    17:03:24 Finished running 1 seed in 1.62s.
    17:03:24
    17:03:24 Completed successfully
    17:03:24
    17:03:24 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  3. 确认这些数据已加载。

    SELECT * FROM imdb_dbt.genre_codes LIMIT 10;
    +-------+----+
    |genre |code|
    +-------+----+
    |Drama |DRA |
    |Romance|ROM |
    |Short |SHO |
    |Mystery|MYS |
    |Adult |ADU |
    |Family |FAM |

    |Action |ACT |
    |Sci-Fi |SCI |
    |Horror |HOR |
    |War |WAR |
    +-------+----+=

限制

dbt 的当前 ClickHouse 插件有一些用户需要注意的限制。

  1. 该插件目前使用 INSERT TO SELECT 将模型物化成表。这实际上意味着数据重复。非常大的数据集 (PB) 会导致极长的运行时间,使得某些模型不可行。尽量减少任何查询返回的行数,尽可能使用 GROUP BY。优先选择汇总数据的模型,而不是那些仅执行转换同时保持源行数的模型。
  2. 要使用分布式表来表示模型,用户必须手动在每个节点上创建底层的复制表。然后,可以在这些表的基础上创建分布式表。该插件不管理集群创建。
  3. 当 dbt 在数据库中创建关系(表/视图)时,它通常将其创建为:{{ database }}.{{ schema }}.{{ table/view id }}。ClickHouse 没有模式的概念。因此,插件使用 {{schema}}.{{ table/view id }},其中 schema 是 ClickHouse 数据库。

更多信息

前面的指南仅触及了 dbt 功能的表面。建议用户阅读优秀的 dbt 文档

插件的其他配置说明请参见 此处