将 dlt 连接到 ClickHouse
dlt 是一个开源库,您可以将其添加到您的 Python 脚本中,以将来自各种且通常混乱的数据源的数据加载到结构良好、实时的数据集中。
使用 ClickHouse 安装 dlt
要安装带有 ClickHouse 依赖项的 dlt
库:
pip install "dlt[clickhouse]"
设置指南
1. 初始化 dlt 项目
首先,按如下方式初始化一个新的 dlt
项目
dlt init chess clickhouse
此命令将使用 chess 作为源,ClickHouse 作为目标来初始化您的管道。
上面的命令生成了几个文件和目录,包括 .dlt/secrets.toml
和 ClickHouse 的 requirements 文件。您可以通过执行 requirements 文件来安装文件中指定的必要依赖项,如下所示
pip install -r requirements.txt
或使用 pip install dlt[clickhouse]
,这将安装 dlt
库以及使用 ClickHouse 作为目标所需的必要依赖项。
2. 设置 ClickHouse 数据库
要将数据加载到 ClickHouse 中,您需要创建一个 ClickHouse 数据库。以下是您应该执行的操作的粗略概述
-
您可以使用现有的 ClickHouse 数据库或创建一个新的数据库。
-
要创建新数据库,请使用
clickhouse-client
命令行工具或您选择的 SQL 客户端连接到您的 ClickHouse 服务器。 -
运行以下 SQL 命令以创建新数据库、用户并授予必要的权限
CREATE DATABASE IF NOT EXISTS dlt;
CREATE USER dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON dlt.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;
3. 添加凭据
接下来,在 .dlt/secrets.toml
文件中设置 ClickHouse 凭据,如下所示
[destination.clickhouse.credentials]
database = "dlt" # The database name you created
username = "dlt" # ClickHouse username, default is usually "default"
password = "Dlt*12345789234567" # ClickHouse password if any
host = "localhost" # ClickHouse server host
port = 9000 # ClickHouse HTTP port, default is 9000
http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443.
secure = 1 # Set to 1 if using HTTPS, else 0.
dataset_table_separator = "___" # Separator for dataset table names from dataset.
HTTP_PORT http_port
参数指定连接到 ClickHouse 服务器的 HTTP 接口时要使用的端口号。这与用于原生 TCP 协议的默认端口 9000 不同。
如果您不使用外部暂存(即,您不在管道中设置暂存参数),则必须设置 http_port
。这是因为内置的 ClickHouse 本地存储暂存使用 clickhouse content 库,该库通过 HTTP 与 ClickHouse 通信。
确保您的 ClickHouse 服务器配置为接受 http_port
指定的端口上的 HTTP 连接。例如,如果您设置 http_port = 8443
,则 ClickHouse 应在端口 8443 上监听 HTTP 请求。如果您使用外部暂存,则可以省略 http_port
参数,因为在这种情况下不会使用 clickhouse-connect。
您可以传递类似于 clickhouse-driver
库使用的数据库连接字符串。上面的凭据将如下所示
# keep it at the top of your toml file, before any section starts.
destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1"
写入处置
支持所有 写入处置。
dlt 库中的写入处置定义了数据应如何写入目标。有三种类型的写入处置
替换:此处置将目标中的数据替换为来自资源的数据。它删除所有类和对象,并在加载数据之前重新创建架构。您可以在此处了解更多信息。
合并:此写入处置将来自资源的数据与目标位置的数据合并。对于 merge
处置,您需要为资源指定 primary_key
。您可以在此处了解更多信息。
追加:这是默认处置。它会将数据追加到目标位置的现有数据中,忽略 primary_key
字段。
数据加载
数据使用最有效的方法加载到 ClickHouse 中,具体取决于数据源
- 对于本地文件,使用
clickhouse-connect
库将文件直接加载到使用INSERT
命令的 ClickHouse 表中。 - 对于远程存储(如
S3
、Google Cloud Storage
或Azure Blob Storage
)中的文件,ClickHouse 表函数(如 s3、gcs 和 azureBlobStorage)用于读取文件并将数据插入表中。
数据集
Clickhouse
不支持一个数据库中的多个数据集,而 dlt
由于多种原因依赖于数据集。为了使 Clickhouse
与 dlt
一起工作,dlt
在您的 Clickhouse
数据库中生成的表的名称将以数据集名称为前缀,并以可配置的 dataset_table_separator
分隔。此外,将创建一个不包含任何数据的特殊 sentinel 表,允许 dlt
识别 Clickhouse
目标中已存在的虚拟数据集。
支持的文件格式
clickhouse
目标与默认 sql 目标有一些特定的偏差
Clickhouse
具有实验性的object
数据类型,但我们发现它有点不可预测,因此 dlt clickhouse 目标会将复杂数据类型加载到文本列中。如果您需要此功能,请联系我们的 Slack 社区,我们将考虑添加它。Clickhouse
不支持time
数据类型。时间将加载到text
列。Clickhouse
不支持binary
数据类型。相反,二进制数据将加载到text
列中。从jsonl
加载时,二进制数据将是 base64 字符串,从 parquet 加载时,binary
对象将转换为text
。Clickhouse
接受向已填充的表中添加非空列。- 当使用 float 或 double 数据类型时,
Clickhouse
在某些条件下可能会产生舍入误差。如果您无法承受舍入误差,请确保使用 decimal 数据类型。例如,将值 12.7001 加载到加载器文件格式设置为jsonl
的 double 列中将可预测地产生舍入误差。
支持的列提示
ClickHouse 支持以下列提示
primary_key
- 将列标记为主键的一部分。多个列可以具有此提示以创建复合主键。
表引擎
默认情况下,表是使用 ClickHouse 中的 ReplicatedMergeTree
表引擎创建的。您可以使用带有 clickhouse 适配器的 table_engine_type
指定备用表引擎
from dlt.destinations.adapters import clickhouse_adapter
@dlt.resource()
def my_resource():
...
clickhouse_adapter(my_resource, table_engine_type="merge_tree")
支持的值有
merge_tree
- 使用MergeTree
引擎创建表replicated_merge_tree
(默认) - 使用ReplicatedMergeTree
引擎创建表
暂存支持
ClickHouse 支持 Amazon S3、Google Cloud Storage 和 Azure Blob Storage 作为文件暂存目标。
dlt
会将 Parquet 或 jsonl 文件上传到暂存位置,并使用 ClickHouse 表函数直接从暂存文件加载数据。
请参阅文件系统文档以了解如何配置暂存目标的凭据
要运行启用暂存的管道
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='clickhouse',
staging='filesystem', # add this to activate staging
dataset_name='chess_data'
)
使用 Google Cloud Storage 作为暂存区
当将数据加载到 ClickHouse 中时,dlt 支持使用 Google Cloud Storage (GCS) 作为暂存区。这由 ClickHouse 的 GCS 表函数自动处理,dlt 在底层使用该函数。
clickhouse GCS 表函数仅支持使用基于哈希的消息身份验证代码 (HMAC) 密钥进行身份验证。为了启用此功能,GCS 提供了 S3 兼容模式,该模式模拟 Amazon S3 API。ClickHouse 利用此功能允许通过其 S3 集成访问 GCS 存储桶。
要在 dlt 中设置具有 HMAC 身份验证的 GCS 暂存
-
按照Google Cloud 指南,为您的 GCS 服务帐户创建 HMAC 密钥。
-
在您的 dlt 项目的 ClickHouse 目标设置
config.toml
中配置 HMAC 密钥以及您的服务帐户的client_email
、project_id
和private_key
[destination.filesystem]
bucket_url = "gs://dlt-ci"
[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"
[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"
注意:除了 HMAC 密钥 bashgcp_access_key_id
和 gcp_secret_access_key
),您现在还需要在 [destination.filesystem.credentials]
下提供您的服务帐户的 client_email
、project_id
和 private_key
。这是因为 GCS 暂存支持现在作为临时解决方法实现,并且仍未优化。
dlt 会将这些凭据传递给 ClickHouse,后者将处理身份验证和 GCS 访问。
目前正在积极努力简化和改进未来 ClickHouse dlt 目标的 GCS 暂存设置。正在这些 GitHub 问题中跟踪正确的 GCS 暂存支持
dbt 支持
通常通过 dbt-clickhouse 支持与 dbt 集成。
同步 dlt
状态
此目标完全支持 dlt 状态同步。