跳至主要内容
跳至主要内容

分布式表引擎

云环境中的分布式引擎

要在 ClickHouse Cloud 中创建分布式表引擎,可以使用 remoteremoteSecure 表函数。Distributed(...) 语法不能在 ClickHouse Cloud 中使用。

使用分布式引擎的表不存储自己的任何数据,但允许在多个服务器上进行分布式查询处理。读取会自动并行化。在读取期间,如果存在,将使用远程服务器上的表索引。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

从表

Distributed 表指向当前服务器上的表时,可以采用该表的模式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

分布式参数

参数描述
cluster服务器配置文件中的集群名称
数据库远程数据库的名称
table远程表的名称
sharding_key (可选)分片键。
指定 sharding_key 对于以下情况是必要的
  • 对于向分布式表进行 INSERT (因为表引擎需要 sharding_key 来确定如何拆分数据)。但是,如果启用了 insert_distributed_one_random_shard 设置,则 INSERT 不需要分片键。
  • 用于 optimize_skip_unused_shards,因为 sharding_key 是确定应该查询哪些分片所必需的
policy_name (可选)策略名称,它将用于存储后台发送的临时文件

参见

分布式设置

设置描述默认值
fsync_after_insert在后台将数据插入到分布式后,对文件数据执行 fsync。保证操作系统将所有插入的数据刷新到发起节点磁盘上的文件。false
fsync_directories对目录执行 fsync。保证操作系统在分布式表上的后台插入相关操作后刷新目录元数据 (插入后、将数据发送到分片后等)。false
skip_unavailable_shards如果为 true,ClickHouse 会静默跳过不可用的分片。当出现以下情况时,分片会被标记为不可用:1) 由于连接故障而无法访问分片。 2) 无法通过 DNS 解析分片。 3) 分片上不存在表。false
bytes_to_throw_insert如果待处理的后台 INSERT 的压缩字节数超过此值,将抛出异常。0 - 不抛出。0
bytes_to_delay_insert如果待处理的后台 INSERT 的压缩字节数超过此值,查询将被延迟。0 - 不延迟。0
max_delay_to_insert如果后台发送有大量待处理字节,分布式表插入数据的最大延迟时间(秒)。60
background_insert_batchdistributed_background_insert_batch 相同0
background_insert_split_batch_on_failuredistributed_background_insert_split_batch_on_failure 相同0
background_insert_sleep_time_msdistributed_background_insert_sleep_time_ms 相同0
background_insert_max_sleep_time_msdistributed_background_insert_max_sleep_time_ms 相同0
flush_on_detachDETACH/DROP/服务器关闭时,将数据刷新到远程节点。true
注意

持久性设置 (fsync_...)

  • 仅影响后台 INSERT (即 distributed_foreground_insert=false),当数据首先存储在发起节点磁盘上,然后在后台发送到分片时。
  • 可能会显著降低 INSERT 性能
  • 影响写入存储在分布式表文件夹内的的数据到 接受您的插入的节点。如果您需要对写入底层 MergeTree 表的数据具有保证,请参阅 system.merge_tree_settings 中的持久性设置 (...fsync...)

有关 插入限制设置 (..._insert) 请参阅

示例

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

数据将从 logs 集群中的所有服务器读取,从位于集群中每个服务器上的 default.hits 表读取。数据不仅被读取,而且在远程服务器上被部分处理 (在可能的范围内)。例如,对于带有 GROUP BY 的查询,数据将在远程服务器上聚合,聚合函数的中间状态将被发送到请求服务器。然后数据将被进一步聚合。

您可以将常量表达式(返回字符串)代替数据库名称。例如:currentDatabase()

集群

集群在 服务器配置文件 中配置

<remote_servers>
    <logs>
        <!-- Inter-server per-cluster secret for Distributed queries
             default: no secret (no authentication will be performed)

             If set, then Distributed queries will be validated on shards, so at least:
             - such cluster should exist on the shard,
             - such cluster should have the same secret.

             And also (and which is more important), the initial_user will
             be used as current user for the query.
        -->
        <!-- <secret></secret> -->
        
        <!-- Optional. Whether distributed DDL queries (ON CLUSTER clause) are allowed for this cluster. Default: true (allowed). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- Optional. Shard weight when writing data. Default: 1. -->
            <weight>1</weight>
            <!-- Optional. The shard name.  Must be non-empty and unique among shards in the cluster. If not specified, will be empty. -->
            <name>shard_01</name>
            <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

这里定义了一个名为 logs 的集群,它由两个分片组成,每个分片包含两个副本。分片引用包含数据不同部分(为了读取所有数据,您必须访问所有分片)的服务器。副本是复制服务器(为了读取所有数据,您可以访问其中任何一个副本)。

集群名称不能包含点。

对于每个服务器,指定参数 hostport,以及可选的 userpasswordsecurecompressionbind_host

参数描述默认值
host远程服务器的地址。您可以使用域名或 IPv4 或 IPv6 地址。如果您指定域名,服务器将在启动时进行 DNS 请求,并在服务器运行期间存储结果。如果 DNS 请求失败,服务器将不会启动。如果您更改 DNS 记录,请重新启动服务器。-
port用于 messenger 活动的 TCP 端口(配置中的 tcp_port,通常设置为 9000)。不要与 http_port 混淆。-
用户用于连接到远程服务器的用户名。该用户必须有权连接到指定的服务器。访问在 users.xml 文件中配置。有关更多信息,请参阅 访问权限 部分。默认
password用于连接到远程服务器的密码 (未屏蔽)。''
secure是否使用安全的 SSL/TLS 连接。通常还需要指定端口 (默认安全端口为 9440)。服务器应监听 <tcp_port_secure>9440</tcp_port_secure> 并配置正确的证书。false
压缩使用数据压缩。true
bind_host从该节点连接到远程服务器时使用的源地址。仅支持 IPv4 地址。用于高级部署用例,需要设置 ClickHouse 分布式查询使用的源 IP 地址。-

在指定副本时,将为每个分片选择一个可用的副本进行读取。您可以配置负载均衡算法 (访问哪个副本的偏好) – 请参阅 load_balancing 设置。如果与服务器的连接未建立,将尝试使用短超时进行连接。如果连接失败,将选择下一个副本,依此类推,直到所有副本。如果对所有副本的连接尝试都失败,将以相同的方式重复尝试多次。这有利于弹性,但不能提供完全的容错能力:远程服务器可能会接受连接,但可能无法正常工作,或者工作效率低下。

您可以指定仅一个分片 (在这种情况下,查询处理应称为远程,而不是分布式),或最多任意数量的分片。在每个分片中,您可以指定一个到任意数量的副本。您可以为每个分片指定不同数量的副本。

您可以在配置中指定任意数量的集群。

要查看您的集群,请使用 system.clusters 表。

Distributed 引擎允许像本地服务器一样使用集群。但是,集群的配置不能动态指定,必须在服务器配置文件中配置。通常,集群中的所有服务器将具有相同的集群配置 (尽管这不是必需的)。配置文件中的集群会在不重新启动服务器的情况下动态更新。

如果您需要将查询发送到每次都不同的分片和副本集,则不需要创建 Distributed 表 – 请使用 remote 表函数。请参阅 表函数 部分。

写入数据

有两种方法可以写入集群数据

首先,您可以定义将哪些数据写入哪些服务器,并直接在每个分片上执行写入。换句话说,直接对 Distributed 表指向的集群中的远程表执行 INSERT 语句。这是最灵活的解决方案,因为您可以使用任何分片方案,即使由于主题领域的需要而分片方案非常复杂。这也是最优的解决方案,因为数据可以完全独立地写入不同的分片。

其次,您可以对 Distributed 表执行 INSERT 语句。在这种情况下,表会将插入的数据自动分发到各个服务器。为了写入 Distributed 表,它必须配置 sharding_key 参数(除非只有一个分片)。

每个分片可以在配置文件中定义一个 <weight>(权重)。默认情况下,权重为 1。数据根据分片权重按比例分发到各个分片。所有分片权重加总后,每个分片权重除以总权重,以确定每个分片的比例。例如,如果有两个分片,第一个权重为 1,第二个权重为 2,则第一个分片将接收插入行数的 1/3,第二个分片将接收 2/3。

每个分片可以在配置文件中定义 internal_replication(内部复制)参数。如果此参数设置为 true,则写入操作会选择第一个健康的副本并将数据写入它。如果 Distributed 表底层的表是复制表(例如,任何 Replicated*MergeTree 表引擎),请使用此设置。表的其中一个副本将接收写入,并自动复制到其他副本。

如果 internal_replication 设置为 false(默认值),则数据将写入所有副本。在这种情况下,Distributed 表本身会复制数据。这比使用复制表更差,因为副本的一致性不会被检查,并且随着时间的推移,它们将包含略有不同的数据。

为了选择将数据行发送到哪个分片,会分析分片表达式,并获取其除以分片总权重后的余数。该行将被发送到对应于从 prev_weightsprev_weights + weight 半区间的余数的分片,其中 prev_weights 是权重最小的分片的总权重,而 weight 是该分片的权重。例如,如果有两个分片,第一个权重为 9,第二个权重为 10,则该行将被发送到第一个分片,余数范围为 [0, 9),发送到第二个分片,余数范围为 [9, 19)。

分片表达式可以是任何由常量和表列组成的表达式,该表达式返回一个整数。例如,您可以使用表达式 rand() 进行随机数据分发,或者使用 UserID 进行按用户 ID 除法的余数分发(然后单个用户的数据将驻留在单个分片上,从而简化按用户运行 INJOIN)。如果其中一列没有足够均匀地分布,您可以将其包装在哈希函数中,例如 intHash64(UserID)

除法的简单余数是分片的一种有限解决方案,并不总是合适的。它适用于中等和大量的数据量(数十个服务器),但不适用于非常大的数据量(数百个服务器或更多)。在后一种情况下,请使用主题领域所需的的分片方案,而不是使用 Distributed 表中的条目。

在以下情况下,您应该关注分片方案

  • 使用的查询需要通过特定键连接数据(INJOIN)。如果数据按此键分片,则可以使用本地 INJOIN,而不是 GLOBAL INGLOBAL JOIN,这效率更高。
  • 使用了大量服务器(数百个或更多)和大量小查询,例如,针对单个客户端的数据的查询(例如,网站、广告商或合作伙伴)。为了防止小查询影响整个集群,将单个客户端的数据定位到单个分片是有意义的。或者,您可以设置双层分片:将整个集群划分为“层”,其中一层可能由多个分片组成。单个客户端的数据位于单个层上,但可以根据需要向层添加分片,并且数据在其中随机分布。为每个层创建 Distributed 表,并为全局查询创建一个共享的分布式表。

数据以后台方式写入。在表中插入时,数据块只是写入本地文件系统。数据会尽快以后台方式发送到远程服务器。发送数据的周期性由 distributed_background_insert_sleep_time_msdistributed_background_insert_max_sleep_time_ms 设置管理。Distributed 引擎单独发送包含插入数据的每个文件,但您可以使用 distributed_background_insert_batch 设置启用文件批量发送。此设置通过更好地利用本地服务器和网络资源来提高集群性能。您应该通过检查表目录中的文件列表(等待发送的数据)来检查数据是否已成功发送:/var/lib/clickhouse/data/database/table/。执行后台任务的线程数可以通过 background_distributed_schedule_pool_size 设置来设置。

如果服务器在向 DistributedINSERT 之后停止存在或经历了粗暴的重启(例如,由于硬件故障),则插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到 broken 子目录,并且不再使用。

读取数据

查询 Distributed 表时,SELECT 查询会发送到所有分片,并且无论数据如何在分片之间分发(它们可以完全随机地分发)都可以工作。当您添加新的分片时,不必将旧数据传输到其中。相反,您可以使用更大的权重写入新数据——数据将略有不均匀地分发,但查询将正确有效地工作。

当启用 max_parallel_replicas 选项时,查询处理将在单个分片内的所有副本之间并行化。有关更多信息,请参阅 max_parallel_replicas 部分。

要了解有关分布式 inglobal in 查询的处理方式,请参阅 此处 的文档。

虚拟列

_Shard_num

_shard_num — 包含表 system.clusters 中的 shard_num 值。类型:UInt32

注意

由于 remote 和 [cluster](../../../sql-reference/table-functions/cluster.md) 表函数在内部创建临时 Distributed 表,因此 _shard_num` 在那里也可以使用。

参见

    © . This site is unofficial and not affiliated with ClickHouse, Inc.