跳到主要内容

分布式表引擎

危险

要在云中创建分布式表引擎,您可以使用 remote 和 remoteSecure 表函数。Distributed(...) 语法在 ClickHouse Cloud 中不可用。

使用分布式引擎的表本身不存储任何数据,但允许在多个服务器上进行分布式查询处理。读取操作会自动并行化。在读取过程中,如果存在任何远程服务器上的索引,则会使用这些索引。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

从表

Distributed 表指向当前服务器上的表时,您可以采用该表的架构

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

分布式参数

集群

cluster - 服务器配置文件中的集群名称

数据库

database - 远程数据库的名称

table - 远程表的名称

分片键

sharding_key - (可选)分片键

以下情况需要指定 sharding_key

  • 对于 INSERTs 到分布式表(因为表引擎需要 sharding_key 来确定如何拆分数据)。但是,如果启用了 insert_distributed_one_random_shard 设置,则 INSERTs 不需要分片键。
  • 用于 optimize_skip_unused_shards,因为 sharding_key 是确定要查询哪些分片所必需的

策略名称

policy_name - (可选)策略名称,它将用于存储用于后台发送的临时文件

另请参阅

分布式设置

fsync_after_insert

fsync_after_insert - 在后台插入到 Distributed 后对文件数据进行 fsync。保证操作系统将整个插入数据刷新到 启动节点 磁盘上的文件。

fsync_directories

fsync_directories - 对目录进行 fsync。保证操作系统在 Distributed 表上进行与后台插入相关的操作(插入后、将数据发送到分片后等)后刷新目录元数据。

skip_unavailable_shards

skip_unavailable_shards - 如果为 true,ClickHouse 将静默地跳过不可用的分片。当出现以下情况时,分片将被标记为不可用:1) 由于连接失败,无法连接到分片。2) 无法通过 DNS 解析分片。3) 分片上不存在表。默认值为 false。

bytes_to_throw_insert

bytes_to_throw_insert - 如果待处理的压缩字节数超过此值,则将抛出异常。0 - 不抛出。默认值为 0。

bytes_to_delay_insert

bytes_to_delay_insert - 如果待处理的压缩字节数超过此值,则查询将被延迟。0 - 不延迟。默认值为 0。

max_delay_to_insert

max_delay_to_insert - 将数据插入 Distributed 表的延迟最大时间(以秒为单位),如果待处理的后台发送字节数很多。默认值为 60。

background_insert_batch

background_insert_batch - 与 distributed_background_insert_batch 相同

background_insert_split_batch_on_failure

background_insert_split_batch_on_failure - 与 distributed_background_insert_split_batch_on_failure 相同

background_insert_sleep_time_ms

background_insert_sleep_time_ms - 与 distributed_background_insert_sleep_time_ms 相同

background_insert_max_sleep_time_ms

background_insert_max_sleep_time_ms - 与 distributed_background_insert_max_sleep_time_ms 相同

flush_on_detach

flush_on_detach - 在 DETACH/DROP/服务器关闭时将数据刷新到远程节点。默认值为 true。

注意

持久性设置fsync_...

  • 仅影响后台 INSERT(即 distributed_foreground_insert=false),当数据首先存储在启动节点磁盘上,然后在后台发送到分片时。
  • 可能会显着降低插入性能
  • 影响将存储在 Distributed 表文件夹内的數據寫入 接受您的插入的节点。如果您需要保证將数据写入底层的 MergeTree 表 - 请参阅 system.merge_tree_settings 中的持久性设置(...fsync...

对于 插入限制设置..._insert)另请参阅

示例

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
fsync_after_insert=0,
fsync_directories=0;

数据将从 logs 集群中的所有服务器读取,从集群中每个服务器上的 default.hits 表读取。数据不仅会被读取,还会在远程服务器上进行部分处理(在可能的范围内)。例如,对于包含 GROUP BY 的查询,数据将在远程服务器上聚合,并将聚合函数的中间状态发送到请求服务器。然后对数据进行进一步聚合。

您可以使用返回字符串的常量表达式来代替数据库名称。例如:currentDatabase()

集群

集群是在 服务器配置文件 中配置的

<remote_servers>
<logs>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)

If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.

And also (and which is more important), the initial_user will
be used as current user for the query.
-->
<!-- <secret></secret> -->

<!-- Optional. Whether distributed DDL queries (ON CLUSTER clause) are allowed for this cluster. Default: true (allowed). -->
<!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->

<shard>
<!-- Optional. Shard weight when writing data. Default: 1. -->
<weight>1</weight>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<internal_replication>false</internal_replication>
<replica>
<!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
<priority>1</priority>
<host>example01-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-01-2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<weight>2</weight>
<internal_replication>false</internal_replication>
<replica>
<host>example01-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-02-2</host>
<secure>1</secure>
<port>9440</port>
</replica>
</shard>
</logs>
</remote_servers>

这里定义了一个名为 logs 的集群,它包含两个分片,每个分片包含两个副本。分片是指包含不同部分数据的服务器(为了读取所有数据,您必须访问所有分片)。副本是指重复的服务器(为了读取所有数据,您可以访问任何一个副本上的数据)。

集群名称不能包含点。

为每个服务器指定了 hostport 和可选的 userpasswordsecurecompression 参数

  • host – 远程服务器的地址。您可以使用域名或 IPv4 或 IPv6 地址。如果您指定了域名,服务器将在启动时发出 DNS 请求,并将结果存储在服务器运行期间。如果 DNS 请求失败,服务器将不会启动。如果您更改了 DNS 记录,请重新启动服务器。
  • port – 用于信使活动的 TCP 端口(配置文件中的 tcp_port,通常设置为 9000)。不要与 http_port 混淆。
  • user – 连接到远程服务器的用户名。默认值为 default 用户。此用户必须具有连接到指定服务器的权限。权限是在 users.xml 文件中配置的。有关详细信息,请参阅部分 访问权限.
  • password – 连接到远程服务器的密码(未屏蔽)。默认值:空字符串。
  • secure - 是否使用安全的 SSL/TLS 连接。通常还需要指定端口(默认安全端口为 9440)。服务器应监听 <tcp_port_secure>9440</tcp_port_secure> 并配置正确的证书。
  • compression - 使用数据压缩。默认值:true

在指定副本时,将在读取时为每个分片选择一个可用的副本。您可以配置负载均衡算法(访问哪个副本的偏好) - 请参阅 load_balancing 设置。如果与服务器的连接未建立,将尝试连接并设置一个短超时时间。如果连接失败,将选择下一个副本,依此类推,直到所有副本都尝试过。如果所有副本的连接尝试都失败,将以相同方式重复尝试几次。这有利于弹性,但不提供完整的容错:远程服务器可能接受连接,但可能无法正常工作或工作效率低下。

您可以只指定一个分片(在这种情况下,查询处理应被称为远程,而不是分布式),或者最多指定任意数量的分片。在每个分片中,您可以指定一个到任意数量的副本。您可以为每个分片指定不同的副本数量。

您可以在配置中指定任意数量的集群。

要查看您的集群,请使用 system.clusters 表。

Distributed 引擎允许像本地服务器一样使用集群。但是,集群的配置不能动态指定,必须在服务器配置文件中配置。通常,集群中的所有服务器都具有相同的集群配置(尽管这不是必需的)。配置文件中的集群会即时更新,无需重启服务器。

如果您需要每次向一组未知的分片和副本发送查询,则无需创建 Distributed 表 - 请改用 remote 表函数。请参阅 表函数 部分。

写入数据

有两种方法可以将数据写入集群

首先,您可以定义将哪些数据写入哪些服务器,并直接对每个分片执行写入操作。换句话说,对集群中 Distributed 表指向的远程表执行直接 INSERT 语句。这是最灵活的解决方案,因为您可以使用任何分片方案,即使是由于主题区域的要求而变得不平凡的方案。这也是最优的解决方案,因为可以完全独立地将数据写入不同的分片。

其次,您可以对 Distributed 表执行 INSERT 语句。在这种情况下,该表将自行将插入的数据分发到各个服务器。为了写入 Distributed 表,它必须配置 sharding_key 参数(除非只有一个分片)。

每个分片可以在配置文件中定义一个 <weight>。默认情况下,权重为 1。数据按与分片权重成比例的量分发到各个分片。所有分片权重都加起来,然后将每个分片的权重除以总权重,以确定每个分片的比例。例如,如果有两个分片,第一个分片的权重为 1,而第二个分片的权重为 2,则第一个分片将收到插入行的三分之一 (1 / 3),而第二个分片将收到三分之二 (2 / 3)。

每个分片可以在配置文件中定义 internal_replication 参数。如果此参数设置为 true,则写入操作会选择第一个健康的副本并将数据写入其中。如果您底层的 Distributed 表的表是复制表(例如,任何 Replicated*MergeTree 表引擎),请使用此方法。其中一个表副本将接收写入,它将自动复制到其他副本。

如果 internal_replication 设置为 false(默认值),则将数据写入所有副本。在这种情况下,Distributed 表会自行复制数据。这比使用复制表要差,因为不会检查副本的一致性,并且随着时间的推移,它们将包含略有不同的数据。

要选择将一行数据发送到的分片,会分析分片表达式,并取其除以分片总权重的余数。该行将被发送到与余数的半间隔相对应的分片,该间隔从 prev_weightsprev_weights + weight,其中 prev_weights 是权重最小的分片的总权重,weight 是此分片的权重。例如,如果有两个分片,第一个分片的权重为 9,而第二个分片的权重为 10,则该行将被发送到第一个分片,用于范围内的余数[0, 9),对于范围内的余数,则被发送到第二个分片[9, 19).

分片表达式可以是常量和表列的任何表达式,该表达式返回一个整数。例如,您可以使用表达式 rand() 来随机分发数据,或者使用 UserID 来按用户 ID 的余数进行分发(然后单个用户的数据将驻留在单个分片上,这简化了按用户运行 INJOIN)。如果其中一个列的分布不够均匀,则可以将其包装在哈希函数中,例如 intHash64(UserID)

除法产生的简单余数是分片的一种有限的解决方案,并不总是合适。它适用于中等和大量数据(几十台服务器),但不适用于超大量数据(数百台服务器或更多)。在后一种情况下,请使用主题区域所需的分片方案,而不是使用 Distributed 表中的条目。

在以下情况下,您应该注意分片方案

  • 使用需要按特定键连接数据(INJOIN)的查询。如果数据按此键进行分片,则可以使用本地 INJOIN 而不是 GLOBAL INGLOBAL JOIN,这要高效得多。
  • 使用大量服务器(数百台或更多),并使用大量的小查询,例如,针对单个客户端数据的查询(例如,网站、广告商或合作伙伴)。为了使小查询不影响整个集群,将单个客户端的数据放在单个分片上是有意义的。或者,您可以设置双层分片:将整个集群划分为“层”,其中一层可以包含多个分片。单个客户端的数据位于单个层上,但可以根据需要向层添加分片,并且数据在其中随机分发。为每一层创建 Distributed 表,并为全局查询创建一个共享的分布式表。

数据在后台写入。当数据插入表中时,数据块只会被写入本地文件系统。数据将在尽可能快地以后台方式发送到远程服务器。发送数据的周期性由 distributed_background_insert_sleep_time_msdistributed_background_insert_max_sleep_time_ms 设置管理。Distributed 引擎会分别发送每个包含插入数据的文件,但是您可以使用 distributed_background_insert_batch 设置启用文件的批量发送。此设置通过更好地利用本地服务器和网络资源来提高集群性能。您应该检查数据是否已成功发送,方法是检查表目录中的文件列表(待发送的数据):/var/lib/clickhouse/data/database/table/。执行后台任务的线程数量可以通过 background_distributed_schedule_pool_size 设置来设置。

如果服务器在向 Distributed 表执行 INSERT 后停止存在或进行了粗暴的重启(例如,由于硬件故障),则插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,它将被转移到 broken 子目录中,不再使用。

读取数据

查询 Distributed 表时,SELECT 查询将被发送到所有分片,并且无论数据如何在分片之间分发(它们可以完全随机地分发),这些查询都可以正常工作。添加新分片时,您无需将旧数据转移到其中。相反,您可以通过使用更大的权重来将新数据写入其中 - 数据的分布将略微不均匀,但查询将正常有效地工作。

启用 max_parallel_replicas 选项时,查询处理将在单个分片内的所有副本之间并行化。有关更多信息,请参阅 max_parallel_replicas 部分。

要详细了解如何处理分布式 inglobal in 查询,请参考 文档。

虚拟列

_shard_num

_shard_num - 包含来自表 system.clustersshard_num 值。类型:UInt32.

注意

由于 remotecluster 表函数在内部创建临时 Distributed 表,因此 _shard_num 也在其中可用。

另请参阅