分布式表引擎
要在 ClickHouse Cloud 中创建分布式表引擎,可以使用 remote 和 remoteSecure 表函数。Distributed(...) 语法不能在 ClickHouse Cloud 中使用。
使用分布式引擎的表不存储自己的任何数据,但允许在多个服务器上进行分布式查询处理。读取会自动并行化。在读取期间,如果存在,将使用远程服务器上的表索引。
创建表
从表
当 Distributed 表指向当前服务器上的表时,可以采用该表的模式
分布式参数
| 参数 | 描述 |
|---|---|
cluster | 服务器配置文件中的集群名称 |
数据库 | 远程数据库的名称 |
table | 远程表的名称 |
sharding_key (可选) | 分片键。 指定 sharding_key 对于以下情况是必要的
|
policy_name (可选) | 策略名称,它将用于存储后台发送的临时文件 |
参见
分布式设置
| 设置 | 描述 | 默认值 |
|---|---|---|
fsync_after_insert | 在后台将数据插入到分布式后,对文件数据执行 fsync。保证操作系统将所有插入的数据刷新到发起节点磁盘上的文件。 | false |
fsync_directories | 对目录执行 fsync。保证操作系统在分布式表上的后台插入相关操作后刷新目录元数据 (插入后、将数据发送到分片后等)。 | false |
skip_unavailable_shards | 如果为 true,ClickHouse 会静默跳过不可用的分片。当出现以下情况时,分片会被标记为不可用:1) 由于连接故障而无法访问分片。 2) 无法通过 DNS 解析分片。 3) 分片上不存在表。 | false |
bytes_to_throw_insert | 如果待处理的后台 INSERT 的压缩字节数超过此值,将抛出异常。0 - 不抛出。 | 0 |
bytes_to_delay_insert | 如果待处理的后台 INSERT 的压缩字节数超过此值,查询将被延迟。0 - 不延迟。 | 0 |
max_delay_to_insert | 如果后台发送有大量待处理字节,分布式表插入数据的最大延迟时间(秒)。 | 60 |
background_insert_batch | 与 distributed_background_insert_batch 相同 | 0 |
background_insert_split_batch_on_failure | 与 distributed_background_insert_split_batch_on_failure 相同 | 0 |
background_insert_sleep_time_ms | 与 distributed_background_insert_sleep_time_ms 相同 | 0 |
background_insert_max_sleep_time_ms | 与 distributed_background_insert_max_sleep_time_ms 相同 | 0 |
flush_on_detach | 在 DETACH/DROP/服务器关闭时,将数据刷新到远程节点。 | true |
持久性设置 (fsync_...)
- 仅影响后台
INSERT(即distributed_foreground_insert=false),当数据首先存储在发起节点磁盘上,然后在后台发送到分片时。 - 可能会显著降低
INSERT性能 - 影响写入存储在分布式表文件夹内的的数据到 接受您的插入的节点。如果您需要对写入底层 MergeTree 表的数据具有保证,请参阅
system.merge_tree_settings中的持久性设置 (...fsync...)
有关 插入限制设置 (..._insert) 请参阅
distributed_foreground_insert设置prefer_localhost_replica设置bytes_to_throw_insert在bytes_to_delay_insert之前处理,因此您不应将其设置为小于bytes_to_delay_insert的值
示例
数据将从 logs 集群中的所有服务器读取,从位于集群中每个服务器上的 default.hits 表读取。数据不仅被读取,而且在远程服务器上被部分处理 (在可能的范围内)。例如,对于带有 GROUP BY 的查询,数据将在远程服务器上聚合,聚合函数的中间状态将被发送到请求服务器。然后数据将被进一步聚合。
您可以将常量表达式(返回字符串)代替数据库名称。例如:currentDatabase()。
集群
集群在 服务器配置文件 中配置
这里定义了一个名为 logs 的集群,它由两个分片组成,每个分片包含两个副本。分片引用包含数据不同部分(为了读取所有数据,您必须访问所有分片)的服务器。副本是复制服务器(为了读取所有数据,您可以访问其中任何一个副本)。
集群名称不能包含点。
对于每个服务器,指定参数 host、port,以及可选的 user、password、secure、compression、bind_host
| 参数 | 描述 | 默认值 |
|---|---|---|
host | 远程服务器的地址。您可以使用域名或 IPv4 或 IPv6 地址。如果您指定域名,服务器将在启动时进行 DNS 请求,并在服务器运行期间存储结果。如果 DNS 请求失败,服务器将不会启动。如果您更改 DNS 记录,请重新启动服务器。 | - |
port | 用于 messenger 活动的 TCP 端口(配置中的 tcp_port,通常设置为 9000)。不要与 http_port 混淆。 | - |
用户 | 用于连接到远程服务器的用户名。该用户必须有权连接到指定的服务器。访问在 users.xml 文件中配置。有关更多信息,请参阅 访问权限 部分。 | 默认 |
password | 用于连接到远程服务器的密码 (未屏蔽)。 | '' |
secure | 是否使用安全的 SSL/TLS 连接。通常还需要指定端口 (默认安全端口为 9440)。服务器应监听 <tcp_port_secure>9440</tcp_port_secure> 并配置正确的证书。 | false |
压缩 | 使用数据压缩。 | true |
bind_host | 从该节点连接到远程服务器时使用的源地址。仅支持 IPv4 地址。用于高级部署用例,需要设置 ClickHouse 分布式查询使用的源 IP 地址。 | - |
在指定副本时,将为每个分片选择一个可用的副本进行读取。您可以配置负载均衡算法 (访问哪个副本的偏好) – 请参阅 load_balancing 设置。如果与服务器的连接未建立,将尝试使用短超时进行连接。如果连接失败,将选择下一个副本,依此类推,直到所有副本。如果对所有副本的连接尝试都失败,将以相同的方式重复尝试多次。这有利于弹性,但不能提供完全的容错能力:远程服务器可能会接受连接,但可能无法正常工作,或者工作效率低下。
您可以指定仅一个分片 (在这种情况下,查询处理应称为远程,而不是分布式),或最多任意数量的分片。在每个分片中,您可以指定一个到任意数量的副本。您可以为每个分片指定不同数量的副本。
您可以在配置中指定任意数量的集群。
要查看您的集群,请使用 system.clusters 表。
Distributed 引擎允许像本地服务器一样使用集群。但是,集群的配置不能动态指定,必须在服务器配置文件中配置。通常,集群中的所有服务器将具有相同的集群配置 (尽管这不是必需的)。配置文件中的集群会在不重新启动服务器的情况下动态更新。
如果您需要将查询发送到每次都不同的分片和副本集,则不需要创建 Distributed 表 – 请使用 remote 表函数。请参阅 表函数 部分。
写入数据
有两种方法可以写入集群数据
首先,您可以定义将哪些数据写入哪些服务器,并直接在每个分片上执行写入。换句话说,直接对 Distributed 表指向的集群中的远程表执行 INSERT 语句。这是最灵活的解决方案,因为您可以使用任何分片方案,即使由于主题领域的需要而分片方案非常复杂。这也是最优的解决方案,因为数据可以完全独立地写入不同的分片。
其次,您可以对 Distributed 表执行 INSERT 语句。在这种情况下,表会将插入的数据自动分发到各个服务器。为了写入 Distributed 表,它必须配置 sharding_key 参数(除非只有一个分片)。
每个分片可以在配置文件中定义一个 <weight>(权重)。默认情况下,权重为 1。数据根据分片权重按比例分发到各个分片。所有分片权重加总后,每个分片权重除以总权重,以确定每个分片的比例。例如,如果有两个分片,第一个权重为 1,第二个权重为 2,则第一个分片将接收插入行数的 1/3,第二个分片将接收 2/3。
每个分片可以在配置文件中定义 internal_replication(内部复制)参数。如果此参数设置为 true,则写入操作会选择第一个健康的副本并将数据写入它。如果 Distributed 表底层的表是复制表(例如,任何 Replicated*MergeTree 表引擎),请使用此设置。表的其中一个副本将接收写入,并自动复制到其他副本。
如果 internal_replication 设置为 false(默认值),则数据将写入所有副本。在这种情况下,Distributed 表本身会复制数据。这比使用复制表更差,因为副本的一致性不会被检查,并且随着时间的推移,它们将包含略有不同的数据。
为了选择将数据行发送到哪个分片,会分析分片表达式,并获取其除以分片总权重后的余数。该行将被发送到对应于从 prev_weights 到 prev_weights + weight 半区间的余数的分片,其中 prev_weights 是权重最小的分片的总权重,而 weight 是该分片的权重。例如,如果有两个分片,第一个权重为 9,第二个权重为 10,则该行将被发送到第一个分片,余数范围为 [0, 9),发送到第二个分片,余数范围为 [9, 19)。
分片表达式可以是任何由常量和表列组成的表达式,该表达式返回一个整数。例如,您可以使用表达式 rand() 进行随机数据分发,或者使用 UserID 进行按用户 ID 除法的余数分发(然后单个用户的数据将驻留在单个分片上,从而简化按用户运行 IN 和 JOIN)。如果其中一列没有足够均匀地分布,您可以将其包装在哈希函数中,例如 intHash64(UserID)。
除法的简单余数是分片的一种有限解决方案,并不总是合适的。它适用于中等和大量的数据量(数十个服务器),但不适用于非常大的数据量(数百个服务器或更多)。在后一种情况下,请使用主题领域所需的的分片方案,而不是使用 Distributed 表中的条目。
在以下情况下,您应该关注分片方案
- 使用的查询需要通过特定键连接数据(
IN或JOIN)。如果数据按此键分片,则可以使用本地IN或JOIN,而不是GLOBAL IN或GLOBAL JOIN,这效率更高。 - 使用了大量服务器(数百个或更多)和大量小查询,例如,针对单个客户端的数据的查询(例如,网站、广告商或合作伙伴)。为了防止小查询影响整个集群,将单个客户端的数据定位到单个分片是有意义的。或者,您可以设置双层分片:将整个集群划分为“层”,其中一层可能由多个分片组成。单个客户端的数据位于单个层上,但可以根据需要向层添加分片,并且数据在其中随机分布。为每个层创建
Distributed表,并为全局查询创建一个共享的分布式表。
数据以后台方式写入。在表中插入时,数据块只是写入本地文件系统。数据会尽快以后台方式发送到远程服务器。发送数据的周期性由 distributed_background_insert_sleep_time_ms 和 distributed_background_insert_max_sleep_time_ms 设置管理。Distributed 引擎单独发送包含插入数据的每个文件,但您可以使用 distributed_background_insert_batch 设置启用文件批量发送。此设置通过更好地利用本地服务器和网络资源来提高集群性能。您应该通过检查表目录中的文件列表(等待发送的数据)来检查数据是否已成功发送:/var/lib/clickhouse/data/database/table/。执行后台任务的线程数可以通过 background_distributed_schedule_pool_size 设置来设置。
如果服务器在向 Distributed 表 INSERT 之后停止存在或经历了粗暴的重启(例如,由于硬件故障),则插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到 broken 子目录,并且不再使用。
读取数据
查询 Distributed 表时,SELECT 查询会发送到所有分片,并且无论数据如何在分片之间分发(它们可以完全随机地分发)都可以工作。当您添加新的分片时,不必将旧数据传输到其中。相反,您可以使用更大的权重写入新数据——数据将略有不均匀地分发,但查询将正确有效地工作。
当启用 max_parallel_replicas 选项时,查询处理将在单个分片内的所有副本之间并行化。有关更多信息,请参阅 max_parallel_replicas 部分。
要了解有关分布式 in 和 global in 查询的处理方式,请参阅 此处 的文档。
虚拟列
_Shard_num
_shard_num — 包含表 system.clusters 中的 shard_num 值。类型:UInt32。
由于 remote 和 [cluster](../../../sql-reference/table-functions/cluster.md) 表函数在内部创建临时 Distributed 表,因此 _shard_num` 在那里也可以使用。
参见