跳到主要内容
跳到主要内容

复制

该引擎基于 Atomic 引擎。它支持通过写入 ZooKeeper 的 DDL 日志进行元数据复制,并在给定数据库的所有副本上执行。

一个 ClickHouse 服务器可以同时运行和更新多个复制数据库。但是,同一个复制数据库不能有多个副本。

创建数据库

CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]

引擎参数

  • zoo_path — ZooKeeper 路径。相同的 ZooKeeper 路径对应于相同的数据库。
  • shard_name — 分片名称。数据库副本按 shard_name 分组到分片中。
  • replica_name — 副本名称。同一分片的所有副本的副本名称必须不同。

对于 ReplicatedMergeTree 表,如果没有提供参数,则使用默认参数: /clickhouse/tables/{uuid}/{shard}{replica}。 这些可以在服务器设置 default_replica_pathdefault_replica_name 中更改。宏 {uuid} 展开为表的 uuid,{shard}{replica} 展开为来自服务器配置的值,而不是来自数据库引擎参数的值。但在未来,可以使用 Replicated 数据库的 shard_namereplica_name

特性和建议

Replicated 数据库的 DDL 查询的工作方式类似于 ON CLUSTER 查询,但存在细微差异。

首先,DDL 请求尝试在发起者(最初从用户接收请求的主机)上执行。如果请求未完成,则用户立即收到错误,其他主机不会尝试完成它。如果请求已在发起者上成功完成,则所有其他主机将自动重试,直到完成为止。发起者将尝试等待查询在其他主机上完成(不超过 distributed_ddl_task_timeout),并将返回一个表,其中包含每个主机上的查询执行状态。

错误情况下的行为由 distributed_ddl_output_mode 设置控制,对于 Replicated 数据库,最好将其设置为 null_status_on_timeout — 即,如果某些主机在 distributed_ddl_task_timeout 的时间内没有时间执行请求,则不抛出异常,而是在表中显示它们的 NULL 状态。

system.clusters 系统表包含一个以复制数据库命名的集群,该集群由数据库的所有副本组成。此集群在创建/删除副本时自动更新,可用于 Distributed 表。

当创建数据库的新副本时,此副本自行创建表。如果副本长时间不可用并落后于复制日志 - 它会使用 ZooKeeper 中的当前元数据检查其本地元数据,将带有数据的额外表移动到单独的非复制数据库(以免意外删除任何多余的内容),创建缺少的表,如果表名已重命名,则更新表名。数据在 ReplicatedMergeTree 级别复制,即如果表未复制,则数据不会被复制(数据库仅负责元数据)。

ALTER TABLE FREEZE|ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART 查询是允许的,但不会被复制。数据库引擎只会将分区/部分添加到当前副本。但是,如果表本身使用 Replicated 表引擎,则在使用 ATTACH 后数据将被复制。

如果您只需要配置集群而无需维护表复制,请参阅 集群发现 功能。

使用示例

创建具有三个主机的集群

node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');

运行 DDL 查询

CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
┌─────hosts────────────┬──status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1 │ 0 │ │ 2 │ 0 │
│ shard1|other_replica │ 0 │ │ 1 │ 0 │
│ other_shard|r1 │ 0 │ │ 0 │ 0 │
└──────────────────────┴─────────┴───────┴─────────────────────┴──────────────────┘

显示系统表

SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

创建分布式表并插入数据

node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r.d SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node3 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└───────┴───────────────┘

在一个或多个主机上添加副本

node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');

集群配置将如下所示

┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 1 │ 2 │ node4 │ 127.0.0.1 │ 9003 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

分布式表也将从新主机获取数据

node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │ [1,3,5,7,9] │
│ node4 │ [0,2,4,6,8] │
└───────┴───────────────┘