跳到主要内容
跳到主要内容

将 dlt 连接到 ClickHouse

dlt 是一个开源库,您可以将其添加到您的 Python 脚本中,以将来自各种且通常混乱的数据源的数据加载到结构良好、实时的数据集中。

使用 ClickHouse 安装 dlt

要安装带有 ClickHouse 依赖项的 dlt 库:

pip install "dlt[clickhouse]" 

设置指南

1. 初始化 dlt 项目

首先,按如下方式初始化一个新的 dlt 项目

dlt init chess clickhouse
注意

此命令将使用 chess 作为源,ClickHouse 作为目标来初始化您的管道。

上面的命令生成了几个文件和目录,包括 .dlt/secrets.toml 和 ClickHouse 的 requirements 文件。您可以通过执行 requirements 文件来安装文件中指定的必要依赖项,如下所示

pip install -r requirements.txt

或使用 pip install dlt[clickhouse],这将安装 dlt 库以及使用 ClickHouse 作为目标所需的必要依赖项。

2. 设置 ClickHouse 数据库

要将数据加载到 ClickHouse 中,您需要创建一个 ClickHouse 数据库。以下是您应该执行的操作的粗略概述

  1. 您可以使用现有的 ClickHouse 数据库或创建一个新的数据库。

  2. 要创建新数据库,请使用 clickhouse-client 命令行工具或您选择的 SQL 客户端连接到您的 ClickHouse 服务器。

  3. 运行以下 SQL 命令以创建新数据库、用户并授予必要的权限

CREATE DATABASE IF NOT EXISTS dlt;
CREATE USER dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON dlt.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;

3. 添加凭据

接下来,在 .dlt/secrets.toml 文件中设置 ClickHouse 凭据,如下所示

[destination.clickhouse.credentials]
database = "dlt" # The database name you created
username = "dlt" # ClickHouse username, default is usually "default"
password = "Dlt*12345789234567" # ClickHouse password if any
host = "localhost" # ClickHouse server host
port = 9000 # ClickHouse HTTP port, default is 9000
http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443.
secure = 1 # Set to 1 if using HTTPS, else 0.
dataset_table_separator = "___" # Separator for dataset table names from dataset.
注意

HTTP_PORT http_port 参数指定连接到 ClickHouse 服务器的 HTTP 接口时要使用的端口号。这与用于原生 TCP 协议的默认端口 9000 不同。

如果您不使用外部暂存(即,您不在管道中设置暂存参数),则必须设置 http_port。这是因为内置的 ClickHouse 本地存储暂存使用 clickhouse content 库,该库通过 HTTP 与 ClickHouse 通信。

确保您的 ClickHouse 服务器配置为接受 http_port 指定的端口上的 HTTP 连接。例如,如果您设置 http_port = 8443,则 ClickHouse 应在端口 8443 上监听 HTTP 请求。如果您使用外部暂存,则可以省略 http_port 参数,因为在这种情况下不会使用 clickhouse-connect。

您可以传递类似于 clickhouse-driver 库使用的数据库连接字符串。上面的凭据将如下所示

# keep it at the top of your toml file, before any section starts.
destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1"

写入处置

支持所有 写入处置

dlt 库中的写入处置定义了数据应如何写入目标。有三种类型的写入处置

替换:此处置将目标中的数据替换为来自资源的数据。它删除所有类和对象,并在加载数据之前重新创建架构。您可以在此处了解更多信息。

合并:此写入处置将来自资源的数据与目标位置的数据合并。对于 merge 处置,您需要为资源指定 primary_key。您可以在此处了解更多信息。

追加:这是默认处置。它会将数据追加到目标位置的现有数据中,忽略 primary_key 字段。

数据加载

数据使用最有效的方法加载到 ClickHouse 中,具体取决于数据源

  • 对于本地文件,使用 clickhouse-connect 库将文件直接加载到使用 INSERT 命令的 ClickHouse 表中。
  • 对于远程存储(如 S3 Google Cloud StorageAzure Blob Storage)中的文件,ClickHouse 表函数(如 s3、gcs 和 azureBlobStorage)用于读取文件并将数据插入表中。

数据集

Clickhouse 不支持一个数据库中的多个数据集,而 dlt 由于多种原因依赖于数据集。为了使 Clickhousedlt 一起工作,dlt 在您的 Clickhouse 数据库中生成的表的名称将以数据集名称为前缀,并以可配置的 dataset_table_separator 分隔。此外,将创建一个不包含任何数据的特殊 sentinel 表,允许 dlt 识别 Clickhouse 目标中已存在的虚拟数据集。

支持的文件格式

  • jsonl 是直接加载和暂存的首选格式。
  • parquet 支持直接加载和暂存。

clickhouse 目标与默认 sql 目标有一些特定的偏差

  1. Clickhouse 具有实验性的 object 数据类型,但我们发现它有点不可预测,因此 dlt clickhouse 目标会将复杂数据类型加载到文本列中。如果您需要此功能,请联系我们的 Slack 社区,我们将考虑添加它。
  2. Clickhouse 不支持 time 数据类型。时间将加载到 text 列。
  3. Clickhouse 不支持 binary 数据类型。相反,二进制数据将加载到 text 列中。从 jsonl 加载时,二进制数据将是 base64 字符串,从 parquet 加载时,binary 对象将转换为 text
  4. Clickhouse 接受向已填充的表中添加非空列。
  5. 当使用 float 或 double 数据类型时,Clickhouse 在某些条件下可能会产生舍入误差。如果您无法承受舍入误差,请确保使用 decimal 数据类型。例如,将值 12.7001 加载到加载器文件格式设置为 jsonl 的 double 列中将可预测地产生舍入误差。

支持的列提示

ClickHouse 支持以下列提示

  • primary_key - 将列标记为主键的一部分。多个列可以具有此提示以创建复合主键。

表引擎

默认情况下,表是使用 ClickHouse 中的 ReplicatedMergeTree 表引擎创建的。您可以使用带有 clickhouse 适配器的 table_engine_type 指定备用表引擎

from dlt.destinations.adapters import clickhouse_adapter


@dlt.resource()
def my_resource():
...


clickhouse_adapter(my_resource, table_engine_type="merge_tree")

支持的值有

  • merge_tree - 使用 MergeTree 引擎创建表
  • replicated_merge_tree (默认) - 使用 ReplicatedMergeTree 引擎创建表

暂存支持

ClickHouse 支持 Amazon S3、Google Cloud Storage 和 Azure Blob Storage 作为文件暂存目标。

dlt 会将 Parquet 或 jsonl 文件上传到暂存位置,并使用 ClickHouse 表函数直接从暂存文件加载数据。

请参阅文件系统文档以了解如何配置暂存目标的凭据

要运行启用暂存的管道

pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='clickhouse',
staging='filesystem', # add this to activate staging
dataset_name='chess_data'
)

使用 Google Cloud Storage 作为暂存区

当将数据加载到 ClickHouse 中时,dlt 支持使用 Google Cloud Storage (GCS) 作为暂存区。这由 ClickHouse 的 GCS 表函数自动处理,dlt 在底层使用该函数。

clickhouse GCS 表函数仅支持使用基于哈希的消息身份验证代码 (HMAC) 密钥进行身份验证。为了启用此功能,GCS 提供了 S3 兼容模式,该模式模拟 Amazon S3 API。ClickHouse 利用此功能允许通过其 S3 集成访问 GCS 存储桶。

要在 dlt 中设置具有 HMAC 身份验证的 GCS 暂存

  1. 按照Google Cloud 指南,为您的 GCS 服务帐户创建 HMAC 密钥。

  2. 在您的 dlt 项目的 ClickHouse 目标设置 config.toml 中配置 HMAC 密钥以及您的服务帐户的 client_emailproject_idprivate_key

[destination.filesystem]
bucket_url = "gs://dlt-ci"

[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"

[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"

注意:除了 HMAC 密钥 bashgcp_access_key_idgcp_secret_access_key),您现在还需要在 [destination.filesystem.credentials] 下提供您的服务帐户的 client_emailproject_idprivate_key。这是因为 GCS 暂存支持现在作为临时解决方法实现,并且仍未优化。

dlt 会将这些凭据传递给 ClickHouse,后者将处理身份验证和 GCS 访问。

目前正在积极努力简化和改进未来 ClickHouse dlt 目标的 GCS 暂存设置。正在这些 GitHub 问题中跟踪正确的 GCS 暂存支持

  • 使文件系统目标 工作 在 s3 兼容模式下使用 gcs
  • Google Cloud Storage 暂存区支持

dbt 支持

通常通过 dbt-clickhouse 支持与 dbt 集成。

同步 dlt 状态

此目标完全支持 dlt 状态同步。