跳至主要内容

复制

该引擎基于 原子 引擎。它支持通过写入 ZooKeeper 的 DDL 日志并对给定数据库的所有副本执行来复制元数据。

一个 ClickHouse 服务器可以同时运行和更新多个复制的数据库。但不能有多个相同复制数据库的副本。

创建数据库

CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]

引擎参数

  • zoo_path — ZooKeeper 路径。相同的 ZooKeeper 路径对应于相同的数据库。
  • shard_name — 分片名称。数据库副本通过 shard_name 分组到分片中。
  • replica_name — 副本名称。对于同一分片的所有副本,副本名称必须不同。

对于 ReplicatedMergeTree 表,如果未提供参数,则使用默认参数:/clickhouse/tables/{uuid}/{shard}{replica}。这些可以在服务器设置 default_replica_pathdefault_replica_name 中更改。宏 {uuid} 展开为表的 uuid,{shard}{replica} 展开为来自服务器配置的值,而不是来自数据库引擎参数的值。但在将来,可以使用复制数据库的 shard_namereplica_name

细节和建议

带有 Replicated 数据库的 DDL 查询的工作方式类似于 ON CLUSTER 查询,但有一些细微差别。

首先,DDL 请求尝试在发起者(最初从用户接收请求的主机)上执行。如果请求未满足,则用户会立即收到错误,其他主机不会尝试满足它。如果请求在发起者上已成功完成,则所有其他主机将自动重试,直到它们完成它。发起者将尝试等待查询在其他主机上完成(不超过 distributed_ddl_task_timeout),并将返回一个包含每个主机上查询执行状态的表。

错误情况下的行为受 distributed_ddl_output_mode 设置的控制,对于 Replicated 数据库,最好将其设置为 null_status_on_timeout — 即,如果某些主机没有时间在 distributed_ddl_task_timeout 内执行请求,则不要抛出异常,而是在表中显示它们的 NULL 状态。

系统表 system.clusters 包含一个名为复制数据库的集群,该集群由数据库的所有副本组成。此集群在创建/删除副本时自动更新,可用于 分布式 表。

创建数据库的新副本时,该副本会自行创建表。如果副本长时间不可用并且落后于复制日志 — 它会检查其本地元数据与 ZooKeeper 中的当前元数据,将具有数据的额外表移动到单独的非复制数据库(以免意外删除任何多余的东西),创建缺少的表,如果表名已重命名,则更新表名。数据在 ReplicatedMergeTree 级别复制,即如果表未复制,则数据不会复制(数据库仅负责元数据)。

ALTER TABLE FREEZE|ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART 查询是允许的,但不会复制。数据库引擎只会将分区/部分添加到当前副本或从中删除。但是,如果表本身使用复制表引擎,则在使用 ATTACH 后数据将被复制。

如果您只需要配置集群而无需维护表复制,请参阅 集群发现 功能。

使用示例

创建具有三个主机的集群

node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');

运行 DDL 查询

CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
┌─────hosts────────────┬──status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1 │ 0 │ │ 2 │ 0 │
│ shard1|other_replica │ 0 │ │ 1 │ 0 │
│ other_shard|r1 │ 0 │ │ 0 │ 0 │
└──────────────────────┴─────────┴───────┴─────────────────────┴──────────────────┘

显示系统表

SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

创建分布式表并插入数据

node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r.d SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node3 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└───────┴───────────────┘

在另一个主机上添加副本

node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');

集群配置如下所示

┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 1 │ 2 │ node4 │ 127.0.0.1 │ 9003 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

分布式表也将从新主机获取数据

node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │ [1,3,5,7,9] │
│ node4 │ [0,2,4,6,8] │
└───────┴───────────────┘