将 dlt 连接到 ClickHouse
dlt 是一个开源库,您可以将其添加到您的 Python 脚本中,以将来自各种且通常混乱的数据源的数据加载到结构良好、实时的数据集中。
使用 ClickHouse 安装 dlt
要安装带有 ClickHouse 依赖项的 dlt 库:
pip install "dlt[clickhouse]"
设置指南
1. 初始化 dlt 项目
首先,按如下方式初始化一个新的 dlt 项目
dlt init chess clickhouse
此命令将使用 chess 作为源,ClickHouse 作为目标来初始化您的管道。
上面的命令生成了几个文件和目录,包括 .dlt/secrets.toml 和 ClickHouse 的 requirements 文件。您可以通过执行 requirements 文件来安装文件中指定的必要依赖项,如下所示
pip install -r requirements.txt
或使用 pip install dlt[clickhouse],这将安装 dlt 库以及使用 ClickHouse 作为目标所需的必要依赖项。
2. 设置 ClickHouse 数据库
要将数据加载到 ClickHouse 中,您需要创建一个 ClickHouse 数据库。以下是您应该执行的操作的粗略概述
-
您可以使用现有的 ClickHouse 数据库或创建一个新的数据库。
-
要创建新数据库,请使用
clickhouse-client命令行工具或您选择的 SQL 客户端连接到您的 ClickHouse 服务器。 -
运行以下 SQL 命令以创建新数据库、用户并授予必要的权限
CREATE DATABASE IF NOT EXISTS dlt;
CREATE USER dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON dlt.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;
3. 添加凭据
接下来,在 .dlt/secrets.toml 文件中设置 ClickHouse 凭据,如下所示
[destination.clickhouse.credentials]
database = "dlt" # The database name you created
username = "dlt" # ClickHouse username, default is usually "default"
password = "Dlt*12345789234567" # ClickHouse password if any
host = "localhost" # ClickHouse server host
port = 9000 # ClickHouse HTTP port, default is 9000
http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443.
secure = 1 # Set to 1 if using HTTPS, else 0.
dataset_table_separator = "___" # Separator for dataset table names from dataset.
HTTP_PORT http_port 参数指定连接到 ClickHouse 服务器的 HTTP 接口时要使用的端口号。这与用于原生 TCP 协议的默认端口 9000 不同。
如果您不使用外部暂存(即,您不在管道中设置暂存参数),则必须设置 http_port。这是因为内置的 ClickHouse 本地存储暂存使用 clickhouse content 库,该库通过 HTTP 与 ClickHouse 通信。
确保您的 ClickHouse 服务器配置为接受 http_port 指定的端口上的 HTTP 连接。例如,如果您设置 http_port = 8443,则 ClickHouse 应在端口 8443 上监听 HTTP 请求。如果您使用外部暂存,则可以省略 http_port 参数,因为在这种情况下不会使用 clickhouse-connect。
您可以传递类似于 clickhouse-driver 库使用的数据库连接字符串。上面的凭据将如下所示
# keep it at the top of your toml file, before any section starts.
destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1"
写入处置
支持所有 写入处置。
dlt 库中的写入处置定义了数据应如何写入目标。有三种类型的写入处置
替换:此处置将目标中的数据替换为来自资源的数据。它删除所有类和对象,并在加载数据之前重新创建架构。您可以在此处了解更多信息。
合并:此写入处置将来自资源的数据与目标位置的数据合并。对于 merge 处置,您需要为资源指定 primary_key。您可以在此处了解更多信息。
追加:这是默认处置。它会将数据追加到目标位置的现有数据中,忽略 primary_key 字段。
数据加载
数据使用最有效的方法加载到 ClickHouse 中,具体取决于数据源
- 对于本地文件,使用
clickhouse-connect库将文件直接加载到使用INSERT命令的 ClickHouse 表中。 - 对于远程存储(如
S3、Google Cloud Storage或Azure Blob Storage)中的文件,ClickHouse 表函数(如 s3、gcs 和 azureBlobStorage)用于读取文件并将数据插入表中。
数据集
Clickhouse 不支持一个数据库中的多个数据集,而 dlt 由于多种原因依赖于数据集。为了使 Clickhouse 与 dlt 一起工作,dlt 在您的 Clickhouse 数据库中生成的表的名称将以数据集名称为前缀,并以可配置的 dataset_table_separator 分隔。此外,将创建一个不包含任何数据的特殊 sentinel 表,允许 dlt 识别 Clickhouse 目标中已存在的虚拟数据集。
支持的文件格式
clickhouse 目标与默认 sql 目标有一些特定的偏差
Clickhouse具有实验性的object数据类型,但我们发现它有点不可预测,因此 dlt clickhouse 目标会将复杂数据类型加载到文本列中。如果您需要此功能,请联系我们的 Slack 社区,我们将考虑添加它。Clickhouse不支持time数据类型。时间将加载到text列。Clickhouse不支持binary数据类型。相反,二进制数据将加载到text列中。从jsonl加载时,二进制数据将是 base64 字符串,从 parquet 加载时,binary对象将转换为text。Clickhouse接受向已填充的表中添加非空列。- 当使用 float 或 double 数据类型时,
Clickhouse在某些条件下可能会产生舍入误差。如果您无法承受舍入误差,请确保使用 decimal 数据类型。例如,将值 12.7001 加载到加载器文件格式设置为jsonl的 double 列中将可预测地产生舍入误差。
支持的列提示
ClickHouse 支持以下列提示
primary_key- 将列标记为主键的一部分。多个列可以具有此提示以创建复合主键。
表引擎
默认情况下,表是使用 ClickHouse 中的 ReplicatedMergeTree 表引擎创建的。您可以使用带有 clickhouse 适配器的 table_engine_type 指定备用表引擎
from dlt.destinations.adapters import clickhouse_adapter
@dlt.resource()
def my_resource():
...
clickhouse_adapter(my_resource, table_engine_type="merge_tree")
支持的值有
merge_tree- 使用MergeTree引擎创建表replicated_merge_tree(默认) - 使用ReplicatedMergeTree引擎创建表
暂存支持
ClickHouse 支持 Amazon S3、Google Cloud Storage 和 Azure Blob Storage 作为文件暂存目标。
dlt 会将 Parquet 或 jsonl 文件上传到暂存位置,并使用 ClickHouse 表函数直接从暂存文件加载数据。
请参阅文件系统文档以了解如何配置暂存目标的凭据
要运行启用暂存的管道
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='clickhouse',
staging='filesystem', # add this to activate staging
dataset_name='chess_data'
)
使用 Google Cloud Storage 作为暂存区
当将数据加载到 ClickHouse 中时,dlt 支持使用 Google Cloud Storage (GCS) 作为暂存区。这由 ClickHouse 的 GCS 表函数自动处理,dlt 在底层使用该函数。
clickhouse GCS 表函数仅支持使用基于哈希的消息身份验证代码 (HMAC) 密钥进行身份验证。为了启用此功能,GCS 提供了 S3 兼容模式,该模式模拟 Amazon S3 API。ClickHouse 利用此功能允许通过其 S3 集成访问 GCS 存储桶。
要在 dlt 中设置具有 HMAC 身份验证的 GCS 暂存
-
按照Google Cloud 指南,为您的 GCS 服务帐户创建 HMAC 密钥。
-
在您的 dlt 项目的 ClickHouse 目标设置
config.toml中配置 HMAC 密钥以及您的服务帐户的client_email、project_id和private_key
[destination.filesystem]
bucket_url = "gs://dlt-ci"
[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"
[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"
注意:除了 HMAC 密钥 bashgcp_access_key_id 和 gcp_secret_access_key),您现在还需要在 [destination.filesystem.credentials] 下提供您的服务帐户的 client_email、project_id 和 private_key。这是因为 GCS 暂存支持现在作为临时解决方法实现,并且仍未优化。
dlt 会将这些凭据传递给 ClickHouse,后者将处理身份验证和 GCS 访问。
目前正在积极努力简化和改进未来 ClickHouse dlt 目标的 GCS 暂存设置。正在这些 GitHub 问题中跟踪正确的 GCS 暂存支持
dbt 支持
通常通过 dbt-clickhouse 支持与 dbt 集成。
同步 dlt 状态
此目标完全支持 dlt 状态同步。