跳至主要内容

将 dlt 连接到 ClickHouse

dlt 是一个开源库,您可以将其添加到您的 Python 脚本中,以将来自各种(通常是杂乱无章的)数据源的数据加载到结构良好的实时数据集中。

使用 ClickHouse 安装 dlt

要使用 ClickHouse 依赖项安装 dlt 库:

pip install "dlt[clickhouse]" 

设置指南

1. 初始化 dlt 项目

首先,按照以下步骤初始化一个新的 dlt 项目

dlt init chess clickhouse
注意

此命令将使用 chess 作为源,ClickHouse 作为目标来初始化您的管道。

上述命令生成多个文件和目录,包括 .dlt/secrets.toml 和 ClickHouse 的 requirements 文件。您可以通过执行以下命令来安装 requirements 文件中指定的必要依赖项

pip install -r requirements.txt

或者使用 pip install dlt[clickhouse],它会安装 dlt 库以及使用 ClickHouse 作为目标所需的必要依赖项。

2. 设置 ClickHouse 数据库

要将数据加载到 ClickHouse 中,您需要创建一个 ClickHouse 数据库。以下是您应该执行的操作的大致概述

  1. 您可以使用现有的 ClickHouse 数据库或创建一个新的数据库。

  2. 要创建一个新的数据库,请使用 clickhouse-client 命令行工具或您选择的 SQL 客户端连接到您的 ClickHouse 服务器。

  3. 运行以下 SQL 命令以创建新的数据库、用户并授予必要的权限

CREATE DATABASE IF NOT EXISTS dlt;
CREATE USER dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON dlt.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;

3. 添加凭据

接下来,在 .dlt/secrets.toml 文件中设置 ClickHouse 凭据,如下所示

[destination.clickhouse.credentials]
database = "dlt" # The database name you created
username = "dlt" # ClickHouse username, default is usually "default"
password = "Dlt*12345789234567" # ClickHouse password if any
host = "localhost" # ClickHouse server host
port = 9000 # ClickHouse HTTP port, default is 9000
http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443.
secure = 1 # Set to 1 if using HTTPS, else 0.
dataset_table_separator = "___" # Separator for dataset table names from dataset.
注意

HTTP_PORT http_port 参数指定连接到 ClickHouse 服务器的 HTTP 接口时要使用的端口号。这与用于本地 TCP 协议的默认端口 9000 不同。

如果您没有使用外部暂存(即,您没有在管道中设置暂存参数),则必须设置 http_port。这是因为 dlt 的内置 ClickHouse 本地存储暂存使用 clickhouse connect 库,该库通过 HTTP 与 ClickHouse 通信。

确保您的 ClickHouse 服务器已配置为在 http_port 指定的端口上接受 HTTP 连接。例如,如果您设置 http_port = 8443,则 ClickHouse 应该在端口 8443 上侦听 HTTP 请求。如果您使用外部暂存,则可以省略 http_port 参数,因为在这种情况下不会使用 clickhouse-connect。

您可以传递类似于 clickhouse-driver 库使用的数据库连接字符串。上面的凭据将如下所示

# keep it at the top of your toml file, before any section starts.
destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1"

写入方式

所有 写入方式 都受支持。

dlt 库中的写入方式定义了如何将数据写入目标。写入方式有三种类型

替换:此方式会将目标中的数据替换为来自资源的数据。它会删除所有类和对象,并在加载数据之前重新创建架构。您可以在 此处 了解更多信息。

合并:此写入方式会将来自资源的数据与目标中的数据合并。对于 合并 方式,您需要为资源指定一个 主键。您可以在 此处 了解更多信息。

追加:这是默认方式。它会将数据追加到目标中的现有数据,忽略 主键 字段。

数据加载

数据使用最有效的方法加载到 ClickHouse 中,具体取决于数据源

  • 对于本地文件,clickhouse-connect 库用于使用 INSERT 命令将文件直接加载到 ClickHouse 表中。
  • 对于远程存储(如 S3Google Cloud StorageAzure Blob Storage)中的文件,ClickHouse 表函数(如 s3、gcs 和 azureBlobStorage)用于读取文件并将数据插入表中。

数据集

Clickhouse 不支持在一个数据库中使用多个数据集,而 dlt 由于多种原因依赖于数据集。为了使 Clickhousedlt 一起工作,dlt 在您的 Clickhouse 数据库中生成的表将以数据集名称作为前缀,并由可配置的 dataset_table_separator 分隔。此外,将创建一个不包含任何数据的特殊哨兵表,允许 dlt 识别哪些虚拟数据集已存在于 Clickhouse 目标中。

支持的文件格式

  • jsonl 是直接加载和暂存的首选格式。
  • parquet 支持直接加载和暂存。

clickhouse 目标与默认的 sql 目标有一些特定的偏差

  1. Clickhouse 具有实验性的 object 数据类型,但我们发现它有点不可预测,因此 dlt clickhouse 目标会将复杂数据类型加载到文本列中。如果您需要此功能,请联系我们的 Slack 社区,我们会考虑添加它。
  2. Clickhouse 不支持 time 数据类型。时间将加载到 text 列中。
  3. Clickhouse 不支持 binary 数据类型。相反,二进制数据将加载到 text 列中。从 jsonl 加载时,二进制数据将是 base64 字符串,从 parquet 加载时,binary 对象将转换为 text
  4. Clickhouse 接受向已填充的表中添加非空列。
  5. 在某些情况下,使用 float 或 double 数据类型时,Clickhouse 会产生舍入误差。如果您无法承受舍入误差,请确保使用 decimal 数据类型。例如,将值 12.7001 加载到使用 jsonl 设置加载器文件格式的 double 列中,将可预测地产生舍入误差。

支持的列提示

ClickHouse 支持以下 列提示

  • primary_key - 将列标记为主键的一部分。多个列可以具有此提示以创建复合主键。

表引擎

默认情况下,表使用 ClickHouse 中的 ReplicatedMergeTree 表引擎创建。您可以使用 clickhouse 适配器的 table_engine_type 指定备用表引擎

from dlt.destinations.adapters import clickhouse_adapter


@dlt.resource()
def my_resource():
...


clickhouse_adapter(my_resource, table_engine_type="merge_tree")

支持的值为

  • merge_tree - 使用 MergeTree 引擎创建表
  • replicated_merge_tree (默认) - 使用 ReplicatedMergeTree 引擎创建表

暂存支持

ClickHouse 支持 Amazon S3、Google Cloud Storage 和 Azure Blob Storage 作为文件暂存目标。

dlt 将将 Parquet 或 JSONL 文件上传到暂存位置,并使用 ClickHouse 表函数直接从暂存文件加载数据。

请参阅文件系统文档以了解如何配置暂存目标的凭据

要运行启用暂存的管道

pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='clickhouse',
staging='filesystem', # add this to activate staging
dataset_name='chess_data'
)

使用 Google Cloud Storage 作为暂存区域

dlt 支持在将数据加载到 ClickHouse 时使用 Google Cloud Storage (GCS) 作为暂存区域。这由 ClickHouse 的 GCS 表函数 自动处理,dlt 在后台使用该函数。

Clickhouse GCS 表函数仅支持使用基于哈希的消息认证码 (HMAC) 密钥进行身份验证。为此,GCS 提供了模拟 Amazon S3 API 的 S3 兼容模式。ClickHouse 利用这一点允许通过其 S3 集成访问 GCS 存储桶。

在 dlt 中设置使用 HMAC 身份验证的 GCS 暂存

  1. 按照 Google Cloud 指南 为您的 GCS 服务帐户创建 HMAC 密钥。

  2. 在您的 dlt 项目的 config.toml 中的 ClickHouse 目标设置中配置 HMAC 密钥以及您的服务帐户的 client_emailproject_idprivate_key

[destination.filesystem]
bucket_url = "gs://dlt-ci"

[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"

[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"

注意:除了 HMAC 密钥 gcp_access_key_idgcp_secret_access_key 之外,您现在还需要在 [destination.filesystem.credentials] 下提供服务帐户的 client_emailproject_idprivate_key。这是因为 GCS 暂存支持目前作为临时解决方法实现,并且尚未优化。

dlt 将将这些凭据传递给 ClickHouse,ClickHouse 将处理身份验证和 GCS 访问。

目前正在积极开展工作,以简化和改进未来 ClickHouse dlt 目标的 GCS 暂存设置。这些 GitHub 问题中跟踪了正确的 GCS 暂存支持

  • 使文件系统目标 以 s3 兼容模式运行的 gcs 配合使用
  • Google Cloud Storage 暂存区域支持

dbt 支持

通过 dbt-clickhouse 通常支持与 dbt 集成。

dlt 状态同步

此目标完全支持 dlt 状态同步。