复制
该引擎基于 原子 引擎。它支持通过写入 ZooKeeper 的 DDL 日志并对给定数据库的所有副本执行来复制元数据。
一个 ClickHouse 服务器可以同时运行和更新多个复制的数据库。但不能有多个相同复制数据库的副本。
创建数据库
CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]
引擎参数
zoo_path
— ZooKeeper 路径。相同的 ZooKeeper 路径对应于相同的数据库。shard_name
— 分片名称。数据库副本通过shard_name
分组到分片中。replica_name
— 副本名称。对于同一分片的所有副本,副本名称必须不同。
对于 ReplicatedMergeTree 表,如果未提供参数,则使用默认参数:/clickhouse/tables/{uuid}/{shard}
和 {replica}
。这些可以在服务器设置 default_replica_path 和 default_replica_name 中更改。宏 {uuid}
展开为表的 uuid,{shard}
和 {replica}
展开为来自服务器配置的值,而不是来自数据库引擎参数的值。但在将来,可以使用复制数据库的 shard_name
和 replica_name
。
细节和建议
带有 Replicated
数据库的 DDL 查询的工作方式类似于 ON CLUSTER 查询,但有一些细微差别。
首先,DDL 请求尝试在发起者(最初从用户接收请求的主机)上执行。如果请求未满足,则用户会立即收到错误,其他主机不会尝试满足它。如果请求在发起者上已成功完成,则所有其他主机将自动重试,直到它们完成它。发起者将尝试等待查询在其他主机上完成(不超过 distributed_ddl_task_timeout),并将返回一个包含每个主机上查询执行状态的表。
错误情况下的行为受 distributed_ddl_output_mode 设置的控制,对于 Replicated
数据库,最好将其设置为 null_status_on_timeout
— 即,如果某些主机没有时间在 distributed_ddl_task_timeout 内执行请求,则不要抛出异常,而是在表中显示它们的 NULL
状态。
系统表 system.clusters 包含一个名为复制数据库的集群,该集群由数据库的所有副本组成。此集群在创建/删除副本时自动更新,可用于 分布式 表。
创建数据库的新副本时,该副本会自行创建表。如果副本长时间不可用并且落后于复制日志 — 它会检查其本地元数据与 ZooKeeper 中的当前元数据,将具有数据的额外表移动到单独的非复制数据库(以免意外删除任何多余的东西),创建缺少的表,如果表名已重命名,则更新表名。数据在 ReplicatedMergeTree
级别复制,即如果表未复制,则数据不会复制(数据库仅负责元数据)。
ALTER TABLE FREEZE|ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART
查询是允许的,但不会复制。数据库引擎只会将分区/部分添加到当前副本或从中删除。但是,如果表本身使用复制表引擎,则在使用 ATTACH
后数据将被复制。
如果您只需要配置集群而无需维护表复制,请参阅 集群发现 功能。
使用示例
创建具有三个主机的集群
node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');
运行 DDL 查询
CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
┌─────hosts────────────┬──status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1 │ 0 │ │ 2 │ 0 │
│ shard1|other_replica │ 0 │ │ 1 │ 0 │
│ other_shard|r1 │ 0 │ │ 0 │ 0 │
└──────────────────────┴─────────┴───────┴─────────────────────┴──────────────────┘
显示系统表
SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘
创建分布式表并插入数据
node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r.d SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node3 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└───────┴───────────────┘
在另一个主机上添加副本
node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');
集群配置如下所示
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 1 │ 2 │ node4 │ 127.0.0.1 │ 9003 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘
分布式表也将从新主机获取数据
node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │ [1,3,5,7,9] │
│ node4 │ [0,2,4,6,8] │
└───────┴───────────────┘