将 S3 与 ClickHouse 集成
您可以将数据从 S3 插入 ClickHouse,也可以使用 S3 作为导出目的地,从而允许与“数据湖”架构进行交互。此外,S3 可以提供“冷”存储层级并帮助分离存储和计算。在下面的部分中,我们使用纽约市出租车数据集来演示在 S3 和 ClickHouse 之间移动数据的过程,以及识别关键配置参数并提供优化性能的提示。
S3 表函数
s3
表函数允许您从 S3 兼容存储读取和写入文件。此语法的概述是
s3(path, [aws_access_key_id, aws_secret_access_key,] [format, [structure, [compression]]])
其中
- path — 带有指向文件的路径的存储桶 URL。这在只读模式下支持以下通配符:
*
、?
、{abc,def}
和{N..M}
,其中N
、M
是数字,'abc'
、'def'
是字符串。有关更多信息,请参见有关 在路径中使用通配符 的文档。 - format — 文件的 格式。
- structure — 表的结构。格式
'column1_name column1_type, column2_name column2_type, ...'
。 - compression — 参数为可选。支持的值:
none
、gzip/gz
、brotli/br
、xz/LZMA
、zstd/zst
。默认情况下,它将根据文件扩展名自动检测压缩。
在路径表达式中使用通配符允许引用多个文件,并为并行打开大门。
准备
为了与基于 S3 的数据集进行交互,我们准备了一个标准的 MergeTree
表作为我们的目标。下面的语句在默认数据库中创建了一个名为 trips
的表
CREATE TABLE trips
(
`trip_id` UInt32,
`vendor_id` Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_date` Date,
`dropoff_datetime` DateTime,
`store_and_fwd_flag` UInt8,
`rate_code_id` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`fare_amount` Float32,
`extra` Float32,
`mta_tax` Float32,
`tip_amount` Float32,
`tolls_amount` Float32,
`ehail_fee` Float32,
`improvement_surcharge` Float32,
`total_amount` Float32,
`payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
`trip_type` UInt8,
`pickup` FixedString(25),
`dropoff` FixedString(25),
`cab_type` Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
`pickup_nyct2010_gid` Int8,
`pickup_ctlabel` Float32,
`pickup_borocode` Int8,
`pickup_ct2010` String,
`pickup_boroct2010` String,
`pickup_cdeligibil` String,
`pickup_ntacode` FixedString(4),
`pickup_ntaname` String,
`pickup_puma` UInt16,
`dropoff_nyct2010_gid` UInt8,
`dropoff_ctlabel` Float32,
`dropoff_borocode` UInt8,
`dropoff_ct2010` String,
`dropoff_boroct2010` String,
`dropoff_cdeligibil` String,
`dropoff_ntacode` FixedString(4),
`dropoff_ntaname` String,
`dropoff_puma` UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime
SETTINGS index_granularity = 8192
请注意 分区 在 pickup_date
字段上的使用。通常分区键用于数据管理,但稍后我们将使用此键来并行写入 S3。
我们出租车数据集中的每个条目都包含一次出租车行程。此匿名数据包含 2000 万条记录,压缩在 S3 存储桶 https://datasets-documentation.s3.eu-west-3.amazonaws.com/ 中,位于 nyc-taxi 文件夹下。数据采用 TSV 格式,每个文件约有 100 万行。
从 S3 读取数据
我们可以将 S3 数据作为源进行查询,而无需在 ClickHouse 中进行持久化。在下面的查询中,我们对 10 行进行了采样。请注意这里没有凭据,因为存储桶是公开可访问的
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames')
LIMIT 10;
请注意,我们不需要列出列,因为 TabSeparatedWithNames
格式在第一行中编码了列名。其他格式(如 CSV
或 TSV
)将为此查询返回自动生成的列,例如 c1
、c2
、c3
等。
查询还支持 虚拟列,例如 _path
和 _file
,它们分别提供有关存储桶路径和文件名的信息。例如
SELECT _path, _file, trip_id
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_0.gz', 'TabSeparatedWithNames')
LIMIT 5;
┌─_path──────────────────────────────────────┬─_file──────┬────trip_id─┐
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999902 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999919 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999944 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999969 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999990 │
└────────────────────────────────────────────┴────────────┴────────────┘
确认此样本数据集中的行数。请注意使用通配符进行文件扩展,因此我们考虑所有 20 个文件。此查询将花费约 10 秒,具体取决于 ClickHouse 实例上的核心数
SELECT count() AS count
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames');
┌────count─┐
│ 20000000 │
└──────────┘
虽然它对采样数据和执行临时查询和探索性查询非常有用,但直接从 S3 读取数据并非您想定期执行的操作。当需要认真处理时,请将数据导入 ClickHouse 中的 MergeTree
表。
使用 clickhouse-local
clickhouse-local
程序使您能够在本地文件上执行快速处理,而无需部署和配置 ClickHouse 服务器。任何使用 s3
表函数的查询都可以使用此实用程序执行。例如
clickhouse-local --query "SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames') LIMIT 10"
从 S3 插入数据
为了利用 ClickHouse 的全部功能,我们接下来读取数据并将数据插入到我们的实例中。我们结合了 s3
函数和一个简单的 INSERT
语句来实现此目的。请注意,我们不需要列出我们的列,因为我们的目标表提供了所需的结构。这要求列按表 DDL 语句中指定的顺序出现:列根据它们在 SELECT
子句中的位置进行映射。所有 1000 万行的插入可能需要几分钟,具体取决于 ClickHouse 实例。下面我们插入 100 万行以确保快速响应。根据需要调整 LIMIT
子句或列选择以导入子集
INSERT INTO trips
SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames')
LIMIT 1000000;
使用 ClickHouse Local 进行远程插入
如果网络安全策略阻止您的 ClickHouse 集群建立出站连接,您可以使用 clickhouse-local
插入 S3 数据。在下面的示例中,我们从 S3 存储桶中读取数据并使用 remote
函数插入 ClickHouse
clickhouse-local --query "INSERT INTO TABLE FUNCTION remote('localhost:9000', 'default.trips', 'username', 'password') (*) SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames') LIMIT 10"
要通过安全的 SSL 连接执行此操作,请使用 remoteSecure
函数。
导出数据
您可以使用 s3
表函数写入 S3 中的文件。这将需要适当的权限。我们在请求中传递了所需的凭据,但请查看 管理凭据 页面以获取更多选项。
在下面的简单示例中,我们将表函数用作目标而不是源。这里,我们从 trips
表中流式传输 10,000 行到存储桶,指定 lz4
压缩和 CSV
的输出类型
INSERT INTO FUNCTION
s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips.csv.lz4',
's3_key',
's3_secret',
'CSV'
)
SELECT *
FROM trips
LIMIT 10000;
请注意,文件格式是如何从扩展名推断出来的。我们也不需要在 s3
函数中指定列 - 这可以从 SELECT
中推断出来。
拆分大型文件
您可能不希望将数据导出为单个文件。包括 ClickHouse 在内的多数工具在读取和写入多个文件时将实现更高的吞吐量性能,因为存在并行的可能性。我们可以多次执行 INSERT
命令,以定位数据的子集。ClickHouse 提供了一种使用 PARTITION
键自动拆分文件的方法。
在下面的示例中,我们使用 rand()
函数的模数创建了 10 个文件。请注意,生成的 partition ID 是如何引用在文件名中的。这将导致 10 个带有数字后缀的文件,例如 trips_0.csv.lz4
、trips_1.csv.lz4
等...
INSERT INTO FUNCTION
s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips_{_partition_id}.csv.lz4',
's3_key',
's3_secret',
'CSV'
)
PARTITION BY rand() % 10
SELECT *
FROM trips
LIMIT 100000;
或者,我们可以引用数据中的一个字段。对于此数据集,payment_type
提供了一个基数为 5 的自然分区键。
INSERT INTO FUNCTION
s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips_{_partition_id}.csv.lz4',
's3_key',
's3_secret',
'CSV'
)
PARTITION BY payment_type
SELECT *
FROM trips
LIMIT 100000;
利用集群
上述所有功能都局限于在单个节点上执行。读取速度将随着 CPU 核心的线性增加,直到其他资源(通常是网络)饱和,从而允许用户进行垂直扩展。但是,这种方法有其局限性。虽然用户可以通过在执行 INSERT INTO SELECT
查询时插入到分布式表中来减轻一些资源压力,但这仍然使单个节点读取、解析和处理数据。为了解决这一挑战并允许我们进行水平扩展读取,我们有 s3Cluster 函数。
接收查询的节点(称为发起者)将创建与集群中每个节点的连接。确定哪些文件需要读取的 glob 模式将解析为一组文件。发起者将文件分发给集群中的节点,这些节点充当工作器。然后,这些工作器会按照它们完成读取的顺序请求要处理的文件。此过程确保我们可以进行水平扩展读取。
s3Cluster
函数采用与单节点变体相同的格式,只是需要一个目标集群来表示工作器节点
s3Cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
cluster_name
— 用于构建一组地址和连接参数以连接到远程和本地服务器的集群名称。source
— 文件或一组文件的 URL。在只读模式下支持以下通配符:*、?、{'abc','def'} 和 {N..M},其中 N、M 为数字,'abc'、'def' 为字符串。有关更多信息,请参见 路径中的通配符。access_key_id
和secret_access_key
— 用于指定与给定端点一起使用的凭据的密钥。可选。format
— 文件的 格式。structure
— 表的结构。格式为 'column1_name column1_type, column2_name column2_type, ...'。
与所有 s3
函数一样,如果存储桶不安全或通过环境定义了安全性(例如,IAM 角色),则凭据是可选的。但是,与 s3 函数不同的是,结构必须在请求中指定(截至 22.3.1 版),即不会推断出模式。
在大多数情况下,此函数将用作 INSERT INTO SELECT
的一部分。在这种情况下,您通常会插入一个分布式表。我们将在下面说明一个简单的示例,其中 trips_all 是一个分布式表。虽然此表使用 events 集群,但用于读写节点的一致性不是必需的。
INSERT INTO default.trips_all
SELECT *
FROM s3Cluster(
'events',
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz',
'TabSeparatedWithNames'
)
插入将针对发起节点进行。这意味着,虽然读取将在每个节点上发生,但结果行将被路由到发起节点以进行分发。在高吞吐量场景中,这可能会成为瓶颈。要解决此问题,请为 s3cluster
函数设置参数 parallel_distributed_insert_select。
S3 表引擎
虽然 s3
函数允许对存储在 S3 中的数据执行 ad-hoc 查询,但它们的语法很冗长。S3
表引擎允许您不必反复指定存储桶 URL 和凭据。为了解决这个问题,ClickHouse 提供了 S3 表引擎。
CREATE TABLE s3_engine_table (name String, value UInt32)
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS ...]
path
— 带有文件路径的存储桶 URL。在只读模式下支持以下通配符:*、?、{abc,def} 和 {N..M},其中 N、M 为数字,'abc'、'def' 为字符串。有关更多信息,请参见 此处。format
— 文件的 格式。aws_access_key_id
、aws_secret_access_key
- AWS 帐户用户的长期凭据。您可以使用它们来验证您的请求。该参数是可选的。如果未指定凭据,将使用配置文件值。有关更多信息,请参见 管理凭据。compression
— 压缩类型。支持的值:none、gzip/gz、brotli/br、xz/LZMA、zstd/zst。该参数是可选的。默认情况下,它将通过文件扩展名自动检测压缩。
读取数据
在以下示例中,我们使用位于 https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/
存储桶中的前十个 TSV 文件创建一个名为 trips_raw
的表。其中每个文件都包含 100 万行数据。
CREATE TABLE trips_raw
(
`trip_id` UInt32,
`vendor_id` Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_date` Date,
`dropoff_datetime` DateTime,
`store_and_fwd_flag` UInt8,
`rate_code_id` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`fare_amount` Float32,
`extra` Float32,
`mta_tax` Float32,
`tip_amount` Float32,
`tolls_amount` Float32,
`ehail_fee` Float32,
`improvement_surcharge` Float32,
`total_amount` Float32,
`payment_type_` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
`trip_type` UInt8,
`pickup` FixedString(25),
`dropoff` FixedString(25),
`cab_type` Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
`pickup_nyct2010_gid` Int8,
`pickup_ctlabel` Float32,
`pickup_borocode` Int8,
`pickup_ct2010` String,
`pickup_boroct2010` FixedString(7),
`pickup_cdeligibil` String,
`pickup_ntacode` FixedString(4),
`pickup_ntaname` String,
`pickup_puma` UInt16,
`dropoff_nyct2010_gid` UInt8,
`dropoff_ctlabel` Float32,
`dropoff_borocode` UInt8,
`dropoff_ct2010` String,
`dropoff_boroct2010` FixedString(7),
`dropoff_cdeligibil` String,
`dropoff_ntacode` FixedString(4),
`dropoff_ntaname` String,
`dropoff_puma` UInt16
) ENGINE = S3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..9}.gz', 'TabSeparatedWithNames', 'gzip');
请注意使用 {0..9} 模式限制为前十个文件。创建后,我们可以像其他任何表一样查询此表。
SELECT DISTINCT(pickup_ntaname)
FROM trips_raw
LIMIT 10;
┌─pickup_ntaname───────────────────────────────────┐
│ Lenox Hill-Roosevelt Island │
│ Airport │
│ SoHo-TriBeCa-Civic Center-Little Italy │
│ West Village │
│ Chinatown │
│ Hudson Yards-Chelsea-Flatiron-Union Square │
│ Turtle Bay-East Midtown │
│ Upper West Side │
│ Murray Hill-Kips Bay │
│ DUMBO-Vinegar Hill-Downtown Brooklyn-Boerum Hill │
└──────────────────────────────────────────────────┘
插入数据
S3
表引擎支持并行读取。仅当表定义不包含通配符模式时才支持写入。因此,上面的表将阻止写入。
为了演示写入,请创建一个指向可写 S3 存储桶的表。
CREATE TABLE trips_dest
(
`trip_id` UInt32,
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_datetime` DateTime,
`tip_amount` Float32,
`total_amount` Float32
) ENGINE = S3('<bucket path>/trips.bin', 'Native');
INSERT INTO trips_dest
SELECT
trip_id,
pickup_date,
pickup_datetime,
dropoff_datetime,
tip_amount,
total_amount
FROM trips
LIMIT 10;
SELECT * FROM trips_dest LIMIT 5;
┌────trip_id─┬─pickup_date─┬─────pickup_datetime─┬────dropoff_datetime─┬─tip_amount─┬─total_amount─┐
│ 1200018648 │ 2015-07-01 │ 2015-07-01 00:00:16 │ 2015-07-01 00:02:57 │ 0 │ 7.3 │
│ 1201452450 │ 2015-07-01 │ 2015-07-01 00:00:20 │ 2015-07-01 00:11:07 │ 1.96 │ 11.76 │
│ 1202368372 │ 2015-07-01 │ 2015-07-01 00:00:40 │ 2015-07-01 00:05:46 │ 0 │ 7.3 │
│ 1200831168 │ 2015-07-01 │ 2015-07-01 00:01:06 │ 2015-07-01 00:09:23 │ 2 │ 12.3 │
│ 1201362116 │ 2015-07-01 │ 2015-07-01 00:01:07 │ 2015-07-01 00:03:31 │ 0 │ 5.3 │
└────────────┴─────────────┴─────────────────────┴─────────────────────┴────────────┴──────────────┘
请注意,只能将行插入新文件。没有合并周期或文件拆分操作。写入文件后,后续插入将失败。用户在这里有两个选项
- 指定设置
s3_create_new_file_on_insert=1
。这将导致在每次插入时创建新文件。数字后缀将附加到每个文件的末尾,对于每个插入操作,数字后缀将单调递增。对于上面的示例,后续插入将导致创建 trips_1.bin 文件。 - 指定设置
s3_truncate_on_insert=1
。这将导致文件截断,即,完成操作后,它将仅包含新插入的行。
这两个设置的默认值为 0,因此强制用户设置其中一个。如果同时设置了 s3_truncate_on_insert
,则它将优先。
关于 S3
表引擎的一些说明。
- 与传统的
MergeTree
族表不同,删除S3
表不会删除底层数据。 - 此表类型的完整设置可以在 此处找到。
- 在使用此引擎时,请注意以下注意事项。
- 不支持 ALTER 查询。
- 不支持 SAMPLE 操作。
- 没有索引的概念,即主索引或跳过索引。
管理凭据
在前面的示例中,我们在 s3
函数或 S3
表定义中传递了凭据。虽然这对于偶尔使用是可以接受的,但用户在生产中需要不太显式的身份验证机制。为了解决这个问题,ClickHouse 有几个选项。
在 config.xml 或 conf.d 下的等效配置文件中指定连接详细信息。假设使用 debian 包进行安装,下面显示了示例文件的内容。
ubuntu@single-node-clickhouse:/etc/clickhouse-server/config.d$ cat s3.xml
<clickhouse>
<s3>
<endpoint-name>
<endpoint>https://dalem-files.s3.amazonaws.com/test/</endpoint>
<access_key_id>key</access_key_id>
<secret_access_key>secret</secret_access_key>
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
</endpoint-name>
</s3>
</clickhouse>这些凭据将用于任何请求,其中上面的端点是请求 URL 的精确前缀匹配。此外,请注意在此示例中能够声明授权标头作为访问密钥和密钥的替代方案。支持的设置的完整列表可以在 此处找到。
上面的示例强调了配置参数
use_environment_credentials
的可用性。此配置参数也可以在s3
级别全局设置。<clickhouse>
<s3>
<use_environment_credentials>true</use_environment_credentials>
</s3>
</clickhouse>此设置会开启尝试从环境中检索 S3 凭据,从而允许通过 IAM 角色进行访问。具体来说,将执行以下检索顺序。
- 查找环境变量
AWS_ACCESS_KEY_ID
、AWS_SECRET_ACCESS_KEY
和AWS_SESSION_TOKEN
。 - 在 $HOME/.aws 中执行检查。
- 通过 AWS 安全令牌服务获取临时凭据 - 即通过 AssumeRole API。
- 检查 ECS 环境变量
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
或AWS_CONTAINER_CREDENTIALS_FULL_URI
和AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN
中的凭据。 - 通过 Amazon EC2 实例元数据 获取凭据,前提是 AWS_EC2_METADATA_DISABLED 未设置为 true。
- 这些相同的设置也可以使用相同的前缀匹配规则为特定端点设置。
- 查找环境变量
优化性能
此页面不适用于 ClickHouse Cloud。此处记录的步骤仅在自管理的 ClickHouse 部署中才需要。
衡量性能
在进行任何更改以提高性能之前,请确保您进行适当的衡量。由于 S3 API 调用对延迟敏感,并且可能会影响客户端计时,因此请使用查询日志作为性能指标,即 system.query_log
。
如果衡量 SELECT
查询的性能,其中大量数据被返回给客户端,则可以使用 null 格式 进行查询,或将结果直接发送到 Null 引擎。这应该可以避免客户端被数据淹没以及网络饱和。
区域本地性
确保您的存储桶位于与您的 ClickHouse 实例相同的区域中。此简单的优化可以显着提高吞吐量性能,尤其是在您将 ClickHouse 实例部署在 AWS 基础架构上时。
使用线程
S3 上的读取性能将随着内核数量线性扩展,前提是您没有受到网络带宽或本地 IO 的限制。增加线程数还具有用户应该注意的内存开销排列。可以修改以下内容以潜在地提高吞吐量性能。
- 通常,
max_threads
的默认值就足够了,即内核数量。如果查询使用的内存量很高,并且需要减少此内存量,或者结果上的 LIMIT 很低,则可以将此值设置得更低。拥有大量内存的用户可能希望尝试增加此值以获得从 S3 获得更高的读取吞吐量。通常,这仅对内核数量较少的机器才有利,即<10. 从进一步并行化中获得的收益通常会随着其他资源充当瓶颈而减少,例如网络。 - 如果执行
INSERT INTO x SELECT
请求,请注意,线程数量将设置为 1,这由设置 max_insert_threads 决定。如果max_threads
大于 1(使用SELECT * FROM system.settings WHERE name='max_threads'
确认),则增加此值将以内存为代价提高插入性能。由于内存消耗开销,请谨慎增加。此值不应与max_threads
一样高,因为资源会消耗在后台合并上。此外,并非所有目标引擎(MergeTree 除外)都支持并行插入。最后,并行插入总是会导致更多部分,从而减慢后续读取速度。请谨慎增加。 - 对于低内存场景,请考虑在插入到 S3 时降低
max_insert_delayed_streams_for_parallel_write
。 - 22.3.1 之前的 ClickHouse 版本仅在使用
s3
函数或S3
表引擎时,才会在多个文件之间并行化读取。这要求用户确保文件在 S3 上被拆分成块,并使用通配符模式进行读取,以实现最佳读取性能。更新的版本现在将在单个文件内并行化下载。 - 假设有足够的内存(测试!),增加 min_insert_block_size_rows 可以提高插入吞吐量。
- 在低线程数场景中,用户可能会受益于将
remote_filesystem_read_method
设置为 "read",以使从 S3 同步读取文件。
格式
ClickHouse 可以使用 s3
函数和 S3
引擎以 支持的格式 读取存储在 S3 存储桶中的文件。如果读取原始文件,则这些格式中的一些具有明显的优势。
- 具有编码列名的格式(如 Native、Parquet、CSVWithNames 和 TabSeparatedWithNames)在查询时将不那么冗长,因为用户不需要在
s3
函数中指定列名。列名允许推断此信息。 - 格式在读取和写入吞吐量方面性能不同。Native 和 parquet 代表读取性能最优的格式,因为它们已经是面向列的,并且更紧凑。Native 格式还受益于与 ClickHouse 在内存中存储数据的对齐方式 - 因此在数据流式传输到 ClickHouse 时减少了处理开销。
- 块大小通常会影响大型文件的读取延迟。如果您只对数据进行采样(例如,返回前 N 行),则这一点非常明显。对于 CSV 和 TSV 等格式,必须解析文件才能返回一组行。Native 和 Parquet 等格式将允许更快地采样,原因是此结果。
- 每种压缩格式都有其优缺点,通常是在压缩级别和压缩或解压缩性能之间取得平衡。如果压缩 CSV 或 TSV 等原始文件,lz4 提供最快的解压缩性能,但牺牲了压缩级别。Gzip 通常压缩效果更好,但读取速度稍慢。Xz 则更进一步,通常提供最佳压缩效果,但压缩和解压缩性能最慢。如果导出,Gz 和 lz4 提供相当的压缩速度。将此与您的连接速度进行比较。任何来自更快的解压缩或压缩的收益都将很容易被连接到 s3 存储桶的较慢速度所抵消。
- 诸如 native 或 parquet 之类的格式通常不值得压缩的开销。由于这些格式本身就非常紧凑,因此数据大小的节省可能很小。压缩和解压缩所花费的时间很少能抵消网络传输时间——尤其是因为 s3 在全球范围内可用,并具有更高的网络带宽。
在内部,ClickHouse 合并树使用两种主要的存储格式:宽表和紧凑表。虽然当前实现使用 ClickHouse 的默认行为——通过设置 min_bytes_for_wide_part
和 min_rows_for_wide_part
进行控制;但我们预计在未来的版本中,S3 的行为会有所不同,例如,min_bytes_for_wide_part
的默认值更大,从而鼓励使用更多紧凑的格式,从而减少文件数量。现在,用户可能希望在仅使用 s3 存储时调整这些设置。
节点扩展
用户通常会有不止一个 ClickHouse 节点可用。虽然用户可以垂直扩展,从而随着内核数量的增加线性提高 S3 吞吐量,但由于硬件可用性和成本效益,水平扩展通常是必要的。
在集群中使用 S3 读取需要使用 s3Cluster
函数,如 使用集群 中所述。虽然这允许读取在节点之间进行分配,但线程设置目前不会发送到所有节点。例如,如果以下查询针对某个节点执行,则只有接收方启动节点将尊重 max_insert_threads
设置
INSERT INTO default.trips_all
SELECT *
FROM s3Cluster(
'events',
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz',
'TabSeparatedWithNames'
)
SETTINGS max_insert_threads=8;
为了确保使用此设置,需要将以下内容添加到每个节点的 config.xml 文件(或 conf.d 下)
<clickhouse>
<profiles>
<default>
<max_insert_threads>8</max_insert_threads>
</default>
</profiles>
</clickhouse>
S3 支持的合并树
s3
函数和相关联的表引擎允许我们使用熟悉的 ClickHouse 语法查询 S3 中的数据。但是,在数据管理功能和性能方面,它们是有限的。不支持主键、无缓存支持,并且文件插入需要由用户管理。
ClickHouse 认识到 S3 是一种有吸引力的存储解决方案,尤其是在查询“较冷”数据的性能不太关键的情况下,用户希望将存储和计算分离。为了实现这一点,提供对使用 S3 作为合并树引擎存储的支持。这将使用户能够利用 S3 的可扩展性和成本效益,以及合并树引擎的插入和查询性能。
存储层级
ClickHouse 存储卷允许从合并树表引擎中抽象出物理磁盘。任何单个卷都可以由一个有序的磁盘集组成。虽然主要允许将多个块设备用于数据存储,但这种抽象也允许使用其他存储类型,包括 S3。ClickHouse 数据部分可以在卷之间移动,并根据存储策略填充率,从而创建存储层级的概念。
存储层级打开了热冷架构,其中最近的数据(通常也是查询最多的数据)只需要在高性能存储(例如 NVMe SSD)上占用少量空间。随着数据的陈旧,查询时间的 SLA 会增加,查询频率也会增加。这部分尾部数据可以存储在速度较慢、性能较低的存储上,例如 HDD 或对象存储,如 S3。
创建磁盘
要将 S3 存储桶用作磁盘,我们必须首先在 ClickHouse 配置文件中声明它。扩展 config.xml 或最好在 conf.d 下提供一个新文件。下面显示了 S3 磁盘声明的示例
<clickhouse>
<storage_configuration>
...
<disks>
<s3>
<type>s3</type>
<endpoint>https://sample-bucket.s3.us-east-2.amazonaws.com/tables/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<region></region>
<metadata_path>/var/lib/clickhouse/disks/s3/</metadata_path>
</s3>
<s3_cache>
<type>cache</type>
<disk>s3</disk>
<path>/var/lib/clickhouse/disks/s3_cache/</path>
<max_size>10Gi</max_size>
</s3_cache>
</disks>
...
</storage_configuration>
</clickhouse>
与该磁盘声明相关的完整设置列表可以在 此处 找到。请注意,可以使用 管理凭据 中描述的相同方法在此处管理凭据,即,可以在上面的设置块中将 use_environment_credentials
设置为 true 以使用 IAM 角色。
创建存储策略
配置后,此“磁盘”可以由策略中声明的存储卷使用。对于下面的示例,我们假设 s3 是我们唯一的存储。这忽略了更复杂的热冷架构,其中数据可以根据 TTL 和填充率重新定位。
<clickhouse>
<storage_configuration>
<disks>
<s3>
...
</s3>
<s3_cache>
...
</s3_cache>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_main>
</policies>
</storage_configuration>
</clickhouse>
创建表
假设您已将磁盘配置为使用具有写入权限的存储桶,您应该能够创建如下示例中的表。为了简洁起见,我们使用了纽约出租车列的子集,并将数据直接流式传输到 s3 支持的表中
CREATE TABLE trips_s3
(
`trip_id` UInt32,
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_datetime` DateTime,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`tip_amount` Float32,
`total_amount` Float32,
`payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime
SETTINGS index_granularity = 8192, storage_policy='s3_main'
INSERT INTO trips_s3 SELECT trip_id, pickup_date, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, tip_amount, total_amount, payment_type FROM s3('https://ch-nyc-taxi.s3.eu-west-3.amazonaws.com/tsv/trips_{0..9}.tsv.gz', 'TabSeparatedWithNames') LIMIT 1000000;
根据硬件,这种 100 万行数据的最后一次插入可能需要几分钟才能执行。您可以通过 system.processes 表确认进度。您可以随意调整行数,直到达到 1000 万行的限制,并探索一些示例查询。
SELECT passenger_count, avg(tip_amount) as avg_tip, avg(total_amount) as avg_amount FROM trips_s3 GROUP BY passenger_count;
修改表
有时用户可能需要修改特定表的存储策略。虽然这可行,但它有一些限制。新的目标策略必须包含所有磁盘和前一个策略的卷,即,不会迁移数据以满足策略更改。验证这些约束时,将通过名称识别卷和磁盘,任何违反尝试都会导致错误。但是,假设您使用前面的示例,以下更改是有效的。
<policies>
<s3_main>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_main>
<s3_tiered>
<volumes>
<hot>
<disk>default</disk>
</hot>
<main>
<disk>s3</disk>
</main>
</volumes>
<move_factor>0.2</move_factor>
</s3_tiered>
</policies>
ALTER TABLE trips_s3 MODIFY SETTING storage_policy='s3_tiered'
在这里,我们在新的 s3_tiered 策略中重用了主卷,并引入了一个新的热卷。这使用默认磁盘,该磁盘仅包含通过参数 <path>
配置的单个磁盘。请注意,我们的卷名称和磁盘没有改变。新的插入到我们的表将驻留在默认磁盘上,直到它达到 move_factor * disk_size - 此时数据将被重新定位到 S3。
处理复制
使用 ReplicatedMergeTree
表引擎可以实现使用 S3 磁盘的复制。有关详细信息,请参见 跨两个 AWS 区域使用 S3 对象存储复制单个分片 指南。
读取和写入
以下注释涵盖了 S3 与 ClickHouse 交互的实现。虽然通常只是信息性的,但在 优化性能 时,它可能对读者有所帮助
- 默认情况下,查询处理管道任何阶段使用的最大查询处理线程数等于内核数。某些阶段比其他阶段更容易并行化,因此此值提供了上限。由于数据是从磁盘流式传输的,因此多个查询阶段可以同时执行。因此,查询使用的确切线程数可能会超过此值。通过设置 max_threads 进行修改。
- 默认情况下,S3 上的读取是异步的。此行为由设置
remote_filesystem_read_method
决定,默认设置为threadpool
。在处理请求时,ClickHouse 会读取条带中的数据块。这些条带中的每一个都可能包含许多列。一个线程将逐个读取其数据块的列。与其同步执行,不如在等待数据之前对所有列进行预取。与对每个列进行同步等待相比,这提供了显着的性能改进。在大多数情况下,用户不需要更改此设置——请参见 优化性能。 - 对于 s3 函数和表,并行下载由值
max_download_threads
和max_download_buffer_size
决定。只有当文件大小大于所有线程的总缓冲区大小之和时,文件才会并行下载。这仅在版本 > 22.3.1 上可用。 - 写入是并行执行的,最多有 100 个并发文件写入线程。
max_insert_delayed_streams_for_parallel_write
的默认值为 1000,它控制并行写入的 S3 Blob 的数量。由于每个正在写入的文件都需要一个缓冲区(约 1MB),因此这有效地限制了 INSERT 的内存消耗。在服务器内存不足的情况下,降低此值可能比较合适。
将 S3 对象存储用作 ClickHouse 磁盘
如果您需要创建存储桶和 IAM 角色的分步说明,请展开 创建 S3 存储桶和 IAM 角色 并按照说明操作
创建 S3 存储桶和 IAM 用户
本文介绍了如何配置 AWS IAM 用户、创建 S3 存储桶以及配置 ClickHouse 使用存储桶作为 S3 磁盘的基本知识。您应该与安全团队合作以确定要使用的权限,并将这些权限视为起点。
创建 AWS IAM 用户
在此过程中,我们将创建服务帐户用户,而不是登录用户。
登录 AWS IAM 管理控制台。
在“用户”中,选择 添加用户
输入用户名,并将凭据类型设置为 访问密钥 - 程序化访问,然后选择 下一步:权限
不要将用户添加到任何组;选择 下一步:标签
除非您需要添加任何标签,否则选择 下一步:审查
选择 创建用户
:::note 忽略关于用户没有权限的警告消息;将在下一节中为存储桶中的用户授予权限 ::
现在已创建用户;单击 显示 并复制访问密钥和密钥。
注意将密钥保存在其他地方;这是唯一一次能够使用秘密访问密钥。
单击关闭,然后在用户屏幕中找到用户。
复制 ARN(Amazon 资源名称)并将其保存起来,以便在配置存储桶的访问策略时使用。
创建 S3 存储桶
在 S3 存储桶部分,选择 创建存储桶
输入存储桶名称,将其他选项保留为默认值
注意存储桶名称必须在整个 AWS 中是唯一的,而不仅仅是在组织内是唯一的,否则会发出错误。
将
阻止所有公共访问
保持启用状态;不需要公共访问。选择页面底部的 创建存储桶
选择链接,复制 ARN 并将其保存起来,以便在配置存储桶的访问策略时使用。
创建存储桶后,在 S3 存储桶列表中找到新的 S3 存储桶并选择链接
选择 创建文件夹
输入一个文件夹名称,该名称将成为 ClickHouse S3 磁盘的目标,然后选择 创建文件夹
该文件夹现在应该在存储桶列表中可见
选中新文件夹的复选框,然后点击 **复制 URL**,保存复制的 URL,以便在下一节的 ClickHouse 存储配置中使用。
选择 **权限** 选项卡,然后点击 **桶策略** 部分中的 **编辑** 按钮。
添加桶策略,示例如下
{
"Version": "2012-10-17",
"Id": "Policy123456",
"Statement": [
{
"Sid": "abc123",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::921234567898:user/mars-s3-user"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::mars-doc-test",
"arn:aws:s3:::mars-doc-test/*"
]
}
]
}
|Parameter | Description | Example Value |
|----------|-------------|----------------|
|Version | Version of the policy interpreter, leave as-is | 2012-10-17 |
|Sid | User-defined policy id | abc123 |
|Effect | Whether user requests will be allowed or denied | Allow |
|Principal | The accounts or user that will be allowed | arn:aws:iam::921234567898:user/mars-s3-user |
|Action | What operations are allowed on the bucket| s3:*|
|Resource | Which resources in the bucket will operations be allowed in | "arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*" |
您应该与安全团队合作以确定要使用的权限,将这些权限视为起点。有关策略和设置的更多信息,请参阅 AWS 文档:https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-policy-language-overview.html
- 保存策略配置。
配置 ClickHouse 以使用 S3 桶作为磁盘
以下示例基于作为服务安装的 Linux Deb 包,具有默认的 ClickHouse 目录。
- 在 ClickHouse
config.d
目录中创建一个新文件来存储存储配置。
vim /etc/clickhouse-server/config.d/storage_config.xml
- 添加以下存储配置;用之前步骤中的桶路径、访问密钥和秘密密钥替换。
<clickhouse>
<storage_configuration>
<disks>
<s3_disk>
<type>s3</type>
<endpoint>https://mars-doc-test.s3.amazonaws.com/clickhouse3/</endpoint>
<access_key_id>ABC123</access_key_id>
<secret_access_key>Abc+123</secret_access_key>
<metadata_path>/var/lib/clickhouse/disks/s3_disk/</metadata_path>
</s3_disk>
<s3_cache>
<type>cache</type>
<disk>s3_disk</disk>
<path>/var/lib/clickhouse/disks/s3_cache/</path>
<max_size>10Gi</max_size>
</s3_cache>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3_disk</disk>
</main>
</volumes>
</s3_main>
</policies>
</storage_configuration>
</clickhouse>
<disks>
标签中的 s3_disk
和 s3_cache
标签是任意标签。这些可以设置为其他内容,但相同的标签必须在 <policies>
标签下的 <disk>
标签中使用,以引用磁盘。<S3_main>
标签也是任意的,它是策略的名称,在 ClickHouse 中创建资源时将用作标识符存储目标。
以上配置适用于 ClickHouse 22.8 或更高版本,如果您使用的是较旧版本,请参阅 存储数据 文档。
有关使用 S3 的更多信息:集成指南:S3 支持的 MergeTree
- 将文件的拥有者更新为
clickhouse
用户和组。
chown clickhouse:clickhouse /etc/clickhouse-server/config.d/storage_config.xml
- 重新启动 ClickHouse 实例以使更改生效。
service clickhouse-server restart
测试
- 使用 ClickHouse 客户端登录,类似于以下内容
clickhouse-client --user default --password ClickHouse123!
- 创建一个指定新 S3 存储策略的表。
CREATE TABLE s3_table1
(
`id` UInt64,
`column1` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS storage_policy = 's3_main';
- 显示使用正确策略创建的表。
SHOW CREATE TABLE s3_table1;
┌─statement────────────────────────────────────────────────────
│ CREATE TABLE default.s3_table1
(
`id` UInt64,
`column1` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS storage_policy = 's3_main', index_granularity = 8192
└──────────────────────────────────────────────────────────────
- 将测试行插入表中。
INSERT INTO s3_table1
(id, column1)
VALUES
(1, 'abc'),
(2, 'xyz');
INSERT INTO s3_table1 (id, column1) FORMAT Values
Query id: 0265dd92-3890-4d56-9d12-71d4038b85d5
Ok.
2 rows in set. Elapsed: 0.337 sec.
- 查看行。
SELECT * FROM s3_table1;
┌─id─┬─column1─┐
│ 1 │ abc │
│ 2 │ xyz │
└────┴─────────┘
2 rows in set. Elapsed: 0.284 sec.
在 AWS 控制台中,导航到桶,并选择新的桶和文件夹。您应该看到以下内容
使用 S3 对象存储在两个 AWS 区域中复制单个分片
对象存储在 ClickHouse Cloud 中默认使用,如果您在 ClickHouse Cloud 中运行,则无需执行此过程。
规划部署
本教程基于在 AWS EC2 中部署两个 ClickHouse 服务器节点和三个 ClickHouse Keeper 节点。ClickHouse 服务器的数据存储是 S3。为了支持灾难恢复,使用两个 AWS 区域,每个区域包含一个 ClickHouse 服务器和一个 S3 桶。
ClickHouse 表在两台服务器之间复制,因此在两个区域之间复制。
安装软件
ClickHouse 服务器节点
在 ClickHouse 服务器节点上执行部署步骤时,请参阅 安装说明。
部署 ClickHouse
在两台主机上部署 ClickHouse,在示例配置中,这些主机分别命名为 chnode1
和 chnode2
。
将 chnode1
放置在一个 AWS 区域中,将 chnode2
放置在另一个区域中。
部署 ClickHouse Keeper
在三台主机上部署 ClickHouse Keeper,在示例配置中,这些主机分别命名为 keepernode1
、keepernode2
和 keepernode3
。keepernode1
可以部署在与 chnode1
相同的区域,keepernode2
可以部署在与 chnode2
相同的区域,keepernode3
可以部署在任何一个区域,但要与该区域中的 ClickHouse 节点位于不同的可用区。
在 ClickHouse Keeper 节点上执行部署步骤时,请参阅 安装说明。
创建 S3 桶
创建两个 S3 桶,分别位于您放置 chnode1
和 chnode2
的区域。
如果您需要创建存储桶和 IAM 角色的分步说明,请展开 创建 S3 存储桶和 IAM 角色 并按照说明操作
创建 S3 存储桶和 IAM 用户
本文介绍了如何配置 AWS IAM 用户、创建 S3 存储桶以及配置 ClickHouse 使用存储桶作为 S3 磁盘的基本知识。您应该与安全团队合作以确定要使用的权限,并将这些权限视为起点。
创建 AWS IAM 用户
在此过程中,我们将创建服务帐户用户,而不是登录用户。
登录 AWS IAM 管理控制台。
在“用户”中,选择 添加用户
输入用户名,并将凭据类型设置为 访问密钥 - 程序化访问,然后选择 下一步:权限
不要将用户添加到任何组;选择 下一步:标签
除非您需要添加任何标签,否则选择 下一步:审查
选择 创建用户
:::note 忽略关于用户没有权限的警告消息;将在下一节中为存储桶中的用户授予权限 ::
现在已创建用户;单击 显示 并复制访问密钥和密钥。
注意将密钥保存在其他地方;这是唯一一次能够使用秘密访问密钥。
单击关闭,然后在用户屏幕中找到用户。
复制 ARN(Amazon 资源名称)并将其保存起来,以便在配置存储桶的访问策略时使用。
创建 S3 存储桶
在 S3 存储桶部分,选择 创建存储桶
输入存储桶名称,将其他选项保留为默认值
注意存储桶名称必须在整个 AWS 中是唯一的,而不仅仅是在组织内是唯一的,否则会发出错误。
将
阻止所有公共访问
保持启用状态;不需要公共访问。选择页面底部的 创建存储桶
选择链接,复制 ARN 并将其保存起来,以便在配置存储桶的访问策略时使用。
创建存储桶后,在 S3 存储桶列表中找到新的 S3 存储桶并选择链接
选择 创建文件夹
输入一个文件夹名称,该名称将成为 ClickHouse S3 磁盘的目标,然后选择 创建文件夹
该文件夹现在应该在存储桶列表中可见
选中新文件夹的复选框,然后点击 **复制 URL**,保存复制的 URL,以便在下一节的 ClickHouse 存储配置中使用。
选择 **权限** 选项卡,然后点击 **桶策略** 部分中的 **编辑** 按钮。
添加桶策略,示例如下
{
"Version": "2012-10-17",
"Id": "Policy123456",
"Statement": [
{
"Sid": "abc123",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::921234567898:user/mars-s3-user"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::mars-doc-test",
"arn:aws:s3:::mars-doc-test/*"
]
}
]
}
|Parameter | Description | Example Value |
|----------|-------------|----------------|
|Version | Version of the policy interpreter, leave as-is | 2012-10-17 |
|Sid | User-defined policy id | abc123 |
|Effect | Whether user requests will be allowed or denied | Allow |
|Principal | The accounts or user that will be allowed | arn:aws:iam::921234567898:user/mars-s3-user |
|Action | What operations are allowed on the bucket| s3:*|
|Resource | Which resources in the bucket will operations be allowed in | "arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*" |
您应该与安全团队合作以确定要使用的权限,将这些权限视为起点。有关策略和设置的更多信息,请参阅 AWS 文档:https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-policy-language-overview.html
- 保存策略配置。
然后将配置文件放置在 /etc/clickhouse-server/config.d/
中。以下是一个桶的示例配置文件,另一个类似,三个突出显示的行不同
<clickhouse>
<storage_configuration>
<disks>
<s3_disk>
<type>s3</type>
<endpoint>https://docs-clickhouse-s3.s3.us-east-2.amazonaws.com/clickhouses3/</endpoint>
<access_key_id>ABCDEFGHIJKLMNOPQRST</access_key_id>
<secret_access_key>Tjdm4kf5snfkj303nfljnev79wkjn2l3knr81007</secret_access_key>
<metadata_path>/var/lib/clickhouse/disks/s3_disk/</metadata_path>
</s3_disk>
<s3_cache>
<type>cache</type>
<disk>s3</disk>
<path>/var/lib/clickhouse/disks/s3_cache/</path>
<max_size>10Gi</max_size>
</s3_cache>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3_disk</disk>
</main>
</volumes>
</s3_main>
</policies>
</storage_configuration>
</clickhouse>
本指南中的许多步骤都会要求您将配置文件放置在 /etc/clickhouse-server/config.d/
中。这是 Linux 系统上用于配置覆盖文件的默认位置。当您将这些文件放入该目录时,ClickHouse 将使用其内容覆盖默认配置。通过将这些文件放置在覆盖目录中,您将避免在升级期间丢失配置。
配置 ClickHouse Keeper
在独立运行 ClickHouse Keeper(与 ClickHouse 服务器分开)时,配置是一个单一的 XML 文件。在本教程中,该文件为 /etc/clickhouse-keeper/keeper_config.xml
。所有三个 Keeper 服务器使用相同的配置,只有一个设置不同;<server_id>
。
server_id
表示要分配给使用配置文件的主机的 ID。在以下示例中,server_id
为 3
,如果您在 <raft_configuration>
部分中向下查看文件,您会看到服务器 3 的主机名为 keepernode3
。ClickHouse Keeper 进程就是通过这种方式知道在选择领导者以及所有其他活动时要连接到哪些其他服务器。
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-keeper/clickhouse-keeper.log</log>
<errorlog>/var/log/clickhouse-keeper/clickhouse-keeper.err.log</errorlog>
<size>1000M</size>
<count>3</count>
</logger>
<listen_host>0.0.0.0</listen_host>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<raft_logs_level>warning</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>keepernode1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>keepernode2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>keepernode3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>
将 ClickHouse Keeper 的配置文件复制到适当的位置(记住设置 <server_id>
)。
sudo -u clickhouse \
cp keeper.xml /etc/clickhouse-keeper/keeper.xml
配置 ClickHouse 服务器
定义集群
ClickHouse 集群在配置的 <remote_servers>
部分中定义。在本示例中,定义了一个名为 cluster_1S_2R
的集群,它包含一个包含两个副本的分片。副本位于 chnode1
和 chnode2
主机上。
<clickhouse>
<remote_servers replace="true">
<cluster_1S_2R>
<shard>
<replica>
<host>chnode1</host>
<port>9000</port>
</replica>
<replica>
<host>chnode2</host>
<port>9000</port>
</replica>
</shard>
</cluster_1S_2R>
</remote_servers>
</clickhouse>
在使用集群时,定义宏来使用集群、分片和副本设置填充 DDL 查询非常方便。本示例允许您指定使用复制表引擎,而无需提供 shard
和 replica
详细信息。在创建表时,您可以通过查询 system.tables
来查看 shard
和 replica
宏是如何使用的。
<clickhouse>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
<macros>
<cluster>cluster_1S_2R</cluster>
<shard>1</shard>
<replica>replica_1</replica>
</macros>
</clickhouse>
上面的宏适用于 chnode1
,在 chnode2
上将 replica
设置为 replica_2
。
禁用零拷贝复制
在 ClickHouse 22.7 及更低版本中,设置 allow_remote_fs_zero_copy_replication
默认设置为 true
,适用于 S3 和 HDFS 磁盘。对于这种灾难恢复场景,应将此设置设置为 false
,在 22.8 及更高版本中,默认情况下它设置为 false
。
此设置应为 false,原因有两个:1) 此功能尚未投入生产;2) 在灾难恢复场景中,数据和元数据都需要存储在多个区域中。将 allow_remote_fs_zero_copy_replication
设置为 false
。
<clickhouse>
<merge_tree>
<allow_remote_fs_zero_copy_replication>false</allow_remote_fs_zero_copy_replication>
</merge_tree>
</clickhouse>
ClickHouse Keeper 负责协调跨 ClickHouse 节点的数据复制。要将 ClickHouse Keeper 节点的信息告知 ClickHouse,请将配置文件添加到每个 ClickHouse 节点。
<clickhouse>
<zookeeper>
<node index="1">
<host>keepernode1</host>
<port>9181</port>
</node>
<node index="2">
<host>keepernode2</host>
<port>9181</port>
</node>
<node index="3">
<host>keepernode3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>
配置网络
在您配置 AWS 中的安全设置时,请参阅 网络端口 列表,以便您的服务器可以相互通信,您也可以与它们通信。
所有三个服务器都必须侦听网络连接,以便它们可以在服务器之间以及与 S3 之间进行通信。默认情况下,ClickHouse 仅在环回地址上侦听,因此必须更改此设置。这在 /etc/clickhouse-server/config.d/
中配置。以下是一个示例,该示例配置 ClickHouse 和 ClickHouse Keeper 以侦听所有 IPv4 接口。有关更多信息,请参阅文档或默认配置文件 /etc/clickhouse/config.xml
。
<clickhouse>
<listen_host>0.0.0.0</listen_host>
</clickhouse>
启动服务器
运行 ClickHouse Keeper
在每个 Keeper 服务器上运行您操作系统的命令,例如
sudo systemctl enable clickhouse-keeper
sudo systemctl start clickhouse-keeper
sudo systemctl status clickhouse-keeper
检查 ClickHouse Keeper 状态
使用 netcat
向 ClickHouse Keeper 发送命令。例如,mntr
返回 ClickHouse Keeper 集群的状态。如果您在每个 Keeper 节点上运行该命令,您会看到其中一个节点是领导者,另外两个是跟随者。
echo mntr | nc localhost 9181
zk_version v22.7.2.15-stable-f843089624e8dd3ff7927b8a125cf3a7a769c069
zk_avg_latency 0
zk_max_latency 11
zk_min_latency 0
zk_packets_received 1783
zk_packets_sent 1783
zk_num_alive_connections 2
zk_outstanding_requests 0
zk_server_state leader
zk_znode_count 135
zk_watch_count 8
zk_ephemerals_count 3
zk_approximate_data_size 42533
zk_key_arena_size 28672
zk_latest_snapshot_size 0
zk_open_file_descriptor_count 182
zk_max_file_descriptor_count 18446744073709551615
zk_followers 2
zk_synced_followers 2
运行 ClickHouse 服务器
在每个 ClickHouse 服务器上运行
sudo service clickhouse-server start
验证 ClickHouse 服务器
当您添加 集群配置 时,定义了一个跨两个 ClickHouse 节点复制的单个分片。在此验证步骤中,您将检查 ClickHouse 启动时是否构建了集群,并将使用该集群创建一个复制表。
验证集群是否存在
show clusters
┌─cluster───────┐
│ cluster_1S_2R │
└───────────────┘
1 row in set. Elapsed: 0.009 sec. `使用
ReplicatedMergeTree
表引擎在集群中创建表create table trips on cluster 'cluster_1S_2R' (
`trip_id` UInt32,
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_datetime` DateTime,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`tip_amount` Float32,
`total_amount` Float32,
`payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4))
ENGINE = ReplicatedMergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime
SETTINGS index_granularity = 8192, storage_policy='s3_main'┌─host────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ chnode1 │ 9000 │ 0 │ │ 1 │ 0 │
│ chnode2 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘了解前面定义的宏的使用
宏
shard
和replica
已在 前面定义,在下面突出显示的行中,您可以看到这些值在每个 ClickHouse 节点上的替换位置。此外,使用值uuid
;uuid
未在宏中定义,因为它是由系统生成的。SELECT create_table_query
FROM system.tables
WHERE name = 'trips'
FORMAT VerticalQuery id: 4d326b66-0402-4c14-9c2f-212bedd282c0
Row 1:
──────
create_table_query: CREATE TABLE default.trips (`trip_id` UInt32, `pickup_date` Date, `pickup_datetime` DateTime, `dropoff_datetime` DateTime, `pickup_longitude` Float64, `pickup_latitude` Float64, `dropoff_longitude` Float64, `dropoff_latitude` Float64, `passenger_count` UInt8, `trip_distance` Float64, `tip_amount` Float32, `total_amount` Float32, `payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
PARTITION BY toYYYYMM(pickup_date) ORDER BY pickup_datetime SETTINGS index_granularity = 8192, storage_policy = 's3_main'
1 row in set. Elapsed: 0.012 sec.注意您可以通过设置
default_replica_path
和default_replica_name
来自定义上面显示的 zookeeper 路径'clickhouse/tables/{uuid}/{shard}
。文档在 这里。
测试
这些测试将验证数据是否正在两台服务器之间复制,以及数据是否存储在 S3 存储桶中而不是本地磁盘上。
从纽约市出租车数据集添加数据
INSERT INTO trips
SELECT trip_id,
pickup_date,
pickup_datetime,
dropoff_datetime,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
passenger_count,
trip_distance,
tip_amount,
total_amount,
payment_type
FROM s3('https://ch-nyc-taxi.s3.eu-west-3.amazonaws.com/tsv/trips_{0..9}.tsv.gz', 'TabSeparatedWithNames') LIMIT 1000000;验证数据是否存储在 S3 中。
此查询显示磁盘上数据的尺寸,以及用于确定使用哪个磁盘的策略。
SELECT
engine,
data_paths,
metadata_path,
storage_policy,
formatReadableSize(total_bytes)
FROM system.tables
WHERE name = 'trips'
FORMAT VerticalQuery id: af7a3d1b-7730-49e0-9314-cc51c4cf053c
Row 1:
──────
engine: ReplicatedMergeTree
data_paths: ['/var/lib/clickhouse/disks/s3_disk/store/551/551a859d-ec2d-4512-9554-3a4e60782853/']
metadata_path: /var/lib/clickhouse/store/e18/e18d3538-4c43-43d9-b083-4d8e0f390cf7/trips.sql
storage_policy: s3_main
formatReadableSize(total_bytes): 36.42 MiB
1 row in set. Elapsed: 0.009 sec.检查本地磁盘上的数据尺寸。从上面可以看出,存储的数百万行在磁盘上的尺寸为 36.42 MiB。这应该在 S3 上,而不是本地磁盘。上面的查询还告诉我们本地磁盘上的数据和元数据存储位置。检查本地数据
root@chnode1:~# du -sh /var/lib/clickhouse/disks/s3_disk/store/551
536K /var/lib/clickhouse/disks/s3_disk/store/551检查每个 S3 存储桶中的 S3 数据(总计未显示,但在插入后两个存储桶都存储了大约 36 MiB 的数据)
S3Express
S3Express 是 Amazon S3 中一种新的高性能、单可用区存储类。
您可以参考这个 博客,了解我们使用 S3Express 测试 ClickHouse 的经验。
S3Express 将数据存储在单个可用区内。这意味着在可用区发生故障的情况下,数据将不可用。
S3 磁盘
使用 S3Express 存储桶作为后端的表创建涉及以下步骤
- 创建一个
Directory
类型的存储桶 - 安装适当的存储桶策略,以授予您的 S3 用户所有必需的权限(例如,
"Action": "s3express:*"
仅允许无限制访问) - 在配置存储策略时,请提供
region
参数
存储配置与普通 S3 相同,例如,可能如下所示
<storage_configuration>
<disks>
<s3_express>
<type>s3</type>
<endpoint>https://my-test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com/store/</endpoint>
<region>eu-north-1</region>
<access_key_id>...</access_key_id>
<secret_access_key>...</secret_access_key>
</s3_express>
</disks>
<policies>
<s3_express>
<volumes>
<main>
<disk>s3_express</disk>
</main>
</volumes>
</s3_express>
</policies>
</storage_configuration>
然后在新存储上创建一个表
CREATE TABLE t
(
a UInt64,
s String
)
ENGINE = MergeTree
ORDER BY a
SETTINGS storage_policy = 's3_express';
S3 存储
S3 存储也受支持,但仅限于 Object URL
路径。示例
select * from s3('https://test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com/file.csv', ...)
它还需要在配置中指定存储桶区域
<s3>
<perf-bucket-url>
<endpoint>https://test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com</endpoint>
<region>eu-north-1</region>
</perf-bucket-url>
</s3>
备份
可以在我们上面创建的磁盘上存储备份
BACKUP TABLE t TO Disk('s3_express', 't.zip')
┌─id───────────────────────────────────┬─status─────────┐
│ c61f65ac-0d76-4390-8317-504a30ba7595 │ BACKUP_CREATED │
└──────────────────────────────────────┴────────────────┘
RESTORE TABLE t AS t_restored FROM Disk('s3_express', 't.zip')
┌─id───────────────────────────────────┬─status───┐
│ 4870e829-8d76-4171-ae59-cffaf58dea04 │ RESTORED │
└──────────────────────────────────────┴──────────┘