分布式表引擎
要在云端创建分布式表引擎,您可以使用 remote 和 remoteSecure 表函数。Distributed(...)
语法不能在 ClickHouse Cloud 中使用。
使用 Distributed 引擎的表不存储任何自身数据,但允许在多台服务器上进行分布式查询处理。读取操作会自动并行化。在读取期间,如果存在远程服务器上的表索引,则会使用它们。
创建表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]
从表创建
当 Distributed
表指向当前服务器上的表时,您可以采用该表的架构
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]
Distributed 参数
cluster
cluster
- 服务器配置文件中的集群名称
database
database
- 远程数据库的名称
table
table
- 远程表的名称
sharding_key
sharding_key
- (可选) 分片键
对于以下情况,指定 sharding_key
是必要的
- 对于
INSERT
到分布式表 (因为表引擎需要sharding_key
来确定如何拆分数据)。但是,如果启用了insert_distributed_one_random_shard
设置,则INSERT
不需要分片键。 - 与
optimize_skip_unused_shards
一起使用,因为sharding_key
是确定应查询哪些分片所必需的
policy_name
policy_name
- (可选) 策略名称,它将用于存储后台发送的临时文件
另请参阅
分布式设置
fsync_after_insert
fsync_after_insert
- 在后台插入到 Distributed 后,对文件数据执行 fsync
。保证操作系统将整个插入的数据刷新到启动节点磁盘上的文件中。
fsync_directories
fsync_directories
- 对目录执行 fsync
。保证操作系统在与 Distributed 表上的后台插入相关的操作(插入后、将数据发送到分片后等)后刷新目录元数据。
skip_unavailable_shards
skip_unavailable_shards
- 如果为 true,ClickHouse 将静默跳过不可用的分片。当出现以下情况时,分片被标记为不可用:1) 由于连接失败,无法访问分片。2) 无法通过 DNS 解析分片。3) 分片上不存在表。默认为 false。
bytes_to_throw_insert
bytes_to_throw_insert
- 如果待处理的后台 INSERT 的压缩字节数超过此数目,将抛出异常。0 - 不抛出。默认为 0。
bytes_to_delay_insert
bytes_to_delay_insert
- 如果待处理的后台 INSERT 的压缩字节数超过此数目,查询将被延迟。0 - 不延迟。默认为 0。
max_delay_to_insert
max_delay_to_insert
- 如果有大量待处理的字节要进行后台发送,则将数据插入到 Distributed 表中的最大延迟时间(秒)。默认为 60。
background_insert_batch
background_insert_batch
- 与 distributed_background_insert_batch 相同
background_insert_split_batch_on_failure
background_insert_split_batch_on_failure
- 与 distributed_background_insert_split_batch_on_failure 相同
background_insert_sleep_time_ms
background_insert_sleep_time_ms
- 与 distributed_background_insert_sleep_time_ms 相同
background_insert_max_sleep_time_ms
background_insert_max_sleep_time_ms
- 与 distributed_background_insert_max_sleep_time_ms 相同
flush_on_detach
flush_on_detach
- 在 DETACH/DROP/服务器关闭时将数据刷新到远程节点。默认为 true。
持久性设置 (fsync_...
)
- 仅影响后台 INSERT (即
distributed_foreground_insert=false
),当数据首先存储在启动节点磁盘上,然后在后台发送到分片时。 - 可能会显著降低插入性能
- 影响将存储在 Distributed 表文件夹内的数据写入到接受您的插入的节点。如果您需要保证将数据写入到基础 MergeTree 表中 - 请参阅
system.merge_tree_settings
中的持久性设置 (...fsync...
)
对于插入限制设置 (..._insert
),另请参阅
- distributed_foreground_insert 设置
- prefer_localhost_replica 设置
bytes_to_throw_insert
在bytes_to_delay_insert
之前处理,因此您不应将其设置为小于bytes_to_delay_insert
的值
示例
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
fsync_after_insert=0,
fsync_directories=0;
数据将从 logs
集群中的所有服务器读取,从集群中每台服务器上的 default.hits
表中读取。数据不仅会被读取,而且会在远程服务器上进行部分处理 (在可能的范围内)。例如,对于带有 GROUP BY
的查询,数据将在远程服务器上聚合,聚合函数的中间状态将被发送到请求服务器。然后数据将被进一步聚合。
您可以将常量表达式(返回字符串)用作数据库名称,而不是直接使用数据库名称。例如:currentDatabase()
。
集群
集群在 服务器配置文件 中配置
<remote_servers>
<logs>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
-->
<!-- <secret></secret> -->
<!-- Optional. Whether distributed DDL queries (ON CLUSTER clause) are allowed for this cluster. Default: true (allowed). -->
<!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
<shard>
<!-- Optional. Shard weight when writing data. Default: 1. -->
<weight>1</weight>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<internal_replication>false</internal_replication>
<replica>
<!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
<priority>1</priority>
<host>example01-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-01-2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<weight>2</weight>
<internal_replication>false</internal_replication>
<replica>
<host>example01-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-02-2</host>
<secure>1</secure>
<port>9440</port>
</replica>
</shard>
</logs>
</remote_servers>
这里定义了一个名为 logs
的集群,它由两个分片组成,每个分片包含两个副本。分片是指包含不同数据部分的服务器 (为了读取所有数据,您必须访问所有分片)。副本是重复的服务器 (为了读取所有数据,您可以访问任何一个副本上的数据)。
集群名称不能包含点号。
参数 host
、port
以及可选的 user
、password
、secure
、compression
是为每台服务器指定的
host
– 远程服务器的地址。您可以使用域名或 IPv4 或 IPv6 地址。如果您指定域名,服务器会在启动时发出 DNS 请求,结果会存储在服务器运行期间。如果 DNS 请求失败,服务器将不会启动。如果您更改 DNS 记录,请重启服务器。port
– 用于消息传递活动的 TCP 端口 (配置中的tcp_port
,通常设置为 9000)。不要与http_port
混淆。user
– 连接到远程服务器的用户名。默认值为default
用户。此用户必须有权连接到指定的服务器。访问权限在users.xml
文件中配置。有关更多信息,请参阅 访问权限 部分。password
– 连接到远程服务器的密码 (未掩码)。默认值:空字符串。secure
- 是否使用安全的 SSL/TLS 连接。通常还需要指定端口 (默认安全端口为9440
)。服务器应监听<tcp_port_secure>9440</tcp_port_secure>
并配置正确的证书。compression
- 使用数据压缩。默认值:true
。
当指定副本时,读取时将为每个分片选择一个可用的副本。您可以配置负载均衡算法 (访问哪个副本的偏好) – 请参阅 load_balancing 设置。如果与服务器的连接未建立,将尝试使用短超时时间进行连接。如果连接失败,将选择下一个副本,依此类推,直到所有副本。如果所有副本的连接尝试都失败,将以相同的方式重复尝试几次。这有利于弹性,但不提供完全的容错能力:远程服务器可能接受连接,但可能无法工作或工作不佳。
您可以仅指定一个分片 (在这种情况下,查询处理应称为远程而不是分布式) 或最多任意数量的分片。在每个分片中,您可以指定一个到任意数量的副本。您可以为每个分片指定不同数量的副本。
您可以在配置中指定任意数量的集群。
要查看您的集群,请使用 system.clusters
表。
Distributed
引擎允许像本地服务器一样使用集群。但是,集群的配置不能动态指定,必须在服务器配置文件中配置。通常,集群中的所有服务器都将具有相同的集群配置 (尽管这不是必需的)。来自配置文件的集群是动态更新的,无需重启服务器。
如果您需要每次都向未知的分片和副本集发送查询,则无需创建 Distributed
表 – 请改用 remote
表函数。请参阅 表函数 部分。
写入数据
有两种方法可以将数据写入集群
首先,您可以定义将哪些数据写入哪些服务器,并在每个分片上直接执行写入。换句话说,在 Distributed
表指向的集群中的远程表上执行直接 INSERT
语句。这是最灵活的解决方案,因为您可以使用任何分片方案,即使是由于主题领域的要求而变得不平凡的方案。这也是最优的解决方案,因为数据可以完全独立地写入不同的分片。
其次,您可以对 Distributed
表执行 INSERT
语句。在这种情况下,表本身会将插入的数据分布到各个服务器。为了写入 Distributed
表,它必须配置 sharding_key
参数 (除非只有一个分片)。
每个分片都可以在配置文件中定义 <weight>
。默认情况下,权重为 1
。数据在分片之间分布的量与分片权重成正比。所有分片权重相加,然后将每个分片的权重除以总权重以确定每个分片的比例。例如,如果有两个分片,第一个分片的权重为 1,第二个分片的权重为 2,则第一个分片将发送三分之一 (1/3) 的插入行,第二个分片将发送三分之二 (2/3) 的插入行。
每个分片都可以在配置文件中定义 internal_replication
参数。如果此参数设置为 true
,则写入操作将选择第一个健康的副本并将数据写入其中。如果 Distributed
表的基础表是复制表 (例如,任何 Replicated*MergeTree
表引擎),请使用此选项。其中一个表副本将接收写入,并会自动复制到其他副本。
如果 internal_replication
设置为 false
(默认值),则数据将写入所有副本。在这种情况下,Distributed
表本身会复制数据。这比使用复制表更糟糕,因为副本的一致性未经过检查,随着时间的推移,它们将包含略微不同的数据。
为了选择数据行发送到的分片,将分析分片表达式,并取其除以分片总权重的余数。该行被发送到与余数的半区间 prev_weights
到 prev_weights + weight
相对应的分片,其中 prev_weights
是编号最小的分片的总权重,weight
是此分片的权重。例如,如果有两个分片,第一个分片的权重为 9,第二个分片的权重为 10,则对于范围 [0, 9) 中的余数,该行将被发送到第一个分片,对于范围 [9, 19) 中的余数,该行将被发送到第二个分片。
分片表达式可以是任何由常量和表列组成的表达式,返回整数。例如,您可以使用表达式 rand()
进行数据的随机分布,或使用 UserID
按用户 ID 的除法余数进行分布 (然后单个用户的数据将驻留在单个分片上,这简化了按用户运行 IN
和 JOIN
)。如果其中一个列的分布不够均匀,您可以将其包装在哈希函数中,例如 intHash64(UserID)
。
简单的除法余数是分片的一个有限解决方案,并非总是适用。它适用于中等和大量数据 (数十台服务器),但不适用于非常大量的数据 (数百台或更多服务器)。在后一种情况下,请使用主题领域所需的分片方案,而不是使用 Distributed
表中的条目。
在以下情况下,您应该关注分片方案
- 使用的查询需要按特定键连接数据 (
IN
或JOIN
)。如果数据是按此键分片的,则可以使用本地IN
或JOIN
而不是GLOBAL IN
或GLOBAL JOIN
,这效率更高。 - 使用了大量服务器 (数百台或更多) 和大量小型查询,例如,针对单个客户的数据查询 (例如,网站、广告商或合作伙伴)。为了使小型查询不影响整个集群,将单个客户的数据定位在单个分片上是有意义的。或者,您可以设置双层分片:将整个集群划分为“层”,其中一层可以由多个分片组成。单个客户的数据位于单个层上,但可以根据需要向层添加分片,并且数据在其中随机分布。为每层创建
Distributed
表,并为全局查询创建单个共享的分布式表。
数据在后台写入。当插入到表中时,数据块仅写入本地文件系统。数据会尽快在后台发送到远程服务器。发送数据的周期由 distributed_background_insert_sleep_time_ms 和 distributed_background_insert_max_sleep_time_ms 设置管理。Distributed
引擎单独发送每个带有插入数据的文件,但您可以使用 distributed_background_insert_batch 设置启用批量发送文件。此设置通过更好地利用本地服务器和网络资源来提高集群性能。您应该通过检查表目录中的文件列表 (等待发送的数据) 来检查数据是否成功发送:/var/lib/clickhouse/data/database/table/
。执行后台任务的线程数可以通过 background_distributed_schedule_pool_size 设置设置。
如果服务器停止存在或在 INSERT
到 Distributed
表后粗暴重启 (例如,由于硬件故障),则插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,它将被传输到 broken
子目录,并且不再使用。
读取数据
当查询 Distributed
表时,SELECT
查询被发送到所有分片,并且无论数据如何在分片之间分布 (它们可以完全随机分布) 都可以工作。当您添加新分片时,您不必将旧数据传输到其中。相反,您可以通过使用更大的权重将新数据写入其中 – 数据将分布得稍微不均匀,但查询将正确且高效地工作。
当启用 max_parallel_replicas
选项时,查询处理将在单个分片内的所有副本之间并行化。有关更多信息,请参阅 max_parallel_replicas 部分。
要了解有关分布式 in
和 global in
查询如何处理的更多信息,请参阅 此 文档。
虚拟列
_shard_num
_shard_num
— 包含表 system.clusters
中的 shard_num
值。类型:UInt32。
另请参阅