跳到主要内容
跳到主要内容
编辑此页

MaterializedPostgreSQL

实验性功能。 了解更多。
ClickHouse Cloud 中不支持
注意

建议 ClickHouse Cloud 用户使用 ClickPipes 进行 PostgreSQL 复制到 ClickHouse。这原生支持用于 PostgreSQL 的高性能变更数据捕获 (CDC)。

创建一个包含来自 PostgreSQL 数据库的表的 ClickHouse 数据库。首先,使用引擎 MaterializedPostgreSQL 的数据库创建 PostgreSQL 数据库的快照并加载所需的表。所需的表可以包括来自指定数据库的任何模式子集的任何表子集。除了快照数据库引擎获取 LSN 之外,一旦执行表的初始转储,它就开始从 WAL 中拉取更新。创建数据库后,新添加到 PostgreSQL 数据库的表不会自动添加到复制中。它们必须使用 ATTACH TABLE db.table 查询手动添加。

复制是使用 PostgreSQL 逻辑复制协议实现的,该协议不允许复制 DDL,但允许知道是否发生了破坏复制的更改(列类型更改、添加/删除列)。检测到此类更改,并且相应的表停止接收更新。在这种情况下,您应该使用 ATTACH/ DETACH PERMANENTLY 查询来完全重新加载表。如果 DDL 不会破坏复制(例如,重命名列),表仍将接收更新(插入是通过位置完成的)。

注意

此数据库引擎是实验性的。要使用它,请在您的配置文件中或使用 SET 命令将 allow_experimental_database_materialized_postgresql 设置为 1

SET allow_experimental_database_materialized_postgresql=1

创建数据库

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedPostgreSQL('host:port', 'database', 'user', 'password') [SETTINGS ...]

引擎参数

  • host:port — PostgreSQL 服务器端点。
  • database — PostgreSQL 数据库名称。
  • user — PostgreSQL 用户。
  • password — 用户密码。

使用示例

CREATE DATABASE postgres_db
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password');

SHOW TABLES FROM postgres_db;

┌─name───┐
│ table1 │
└────────┘

SELECT * FROM postgresql_db.postgres_table;

动态添加新表到复制

创建 MaterializedPostgreSQL 数据库后,它不会自动检测到相应的 PostgreSQL 数据库中的新表。此类表可以手动添加

ATTACH TABLE postgres_database.new_table;
危险

在 22.1 版本之前,将表添加到复制会留下一个未删除的临时复制槽(命名为 {db_name}_ch_replication_slot_tmp)。如果在 22.1 之前的 ClickHouse 版本中附加表,请确保手动删除它 (SELECT pg_drop_replication_slot('{db_name}_ch_replication_slot_tmp'))。否则,磁盘使用量将增长。此问题在 22.1 中已修复。

动态从复制中删除表

可以从复制中删除特定表

DETACH TABLE postgres_database.table_to_remove PERMANENTLY;

PostgreSQL 模式

PostgreSQL 模式 可以通过 3 种方式配置(从 21.12 版本开始)。

  1. 一个 MaterializedPostgreSQL 数据库引擎对应一个模式。需要使用设置 materialized_postgresql_schema。表仅通过表名访问
CREATE DATABASE postgres_database
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema = 'postgres_schema';

SELECT * FROM postgres_database.table1;
  1. 一个 MaterializedPostgreSQL 数据库引擎对应任意数量的模式和指定的表集。需要使用设置 materialized_postgresql_tables_list。每个表都与其模式一起写入。表同时通过模式名称和表名称访问
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_tables_list = 'schema1.table1,schema2.table2,schema1.table3',
materialized_postgresql_tables_list_with_schema = 1;

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema2.table2`;

但在这种情况下,materialized_postgresql_tables_list 中的所有表都必须与其模式名称一起写入。需要 materialized_postgresql_tables_list_with_schema = 1

警告:在这种情况下,表名中不允许使用点。

  1. 一个 MaterializedPostgreSQL 数据库引擎对应任意数量的模式和完整的表集。需要使用设置 materialized_postgresql_schema_list
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema_list = 'schema1,schema2,schema3';

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema1.table2`;
SELECT * FROM database1.`schema2.table2`;

警告:在这种情况下,表名中不允许使用点。

要求

  1. wal_level 设置的值必须为 logical,并且 max_replication_slots 参数的值在 PostgreSQL 配置文件中必须至少为 2

  2. 每个复制的表都必须具有以下 副本标识 之一

  • 主键(默认)

  • 索引

postgres# CREATE TABLE postgres_table (a Integer NOT NULL, b Integer, c Integer NOT NULL, d Integer, e Integer NOT NULL);
postgres# CREATE unique INDEX postgres_table_index on postgres_table(a, c, e);
postgres# ALTER TABLE postgres_table REPLICA IDENTITY USING INDEX postgres_table_index;

始终首先检查主键。如果不存在,则检查定义为副本标识索引的索引。如果索引用作副本标识,则表中必须只有一个此类索引。您可以使用以下命令检查特定表使用的类型

postgres# SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class
WHERE oid = 'postgres_table'::regclass;
注意

不支持复制 TOAST 值。将使用数据类型的默认值。

设置

materialized_postgresql_tables_list

设置以逗号分隔的 PostgreSQL 数据库表列表,这些表将通过 MaterializedPostgreSQL 数据库引擎复制。

每个表都可以在括号中包含复制列的子集。如果省略了列子集,则将复制该表的所有列。

materialized_postgresql_tables_list = 'table1(co1, col2),table2,table3(co3, col5, col7)

默认值:空列表 — 表示将复制整个 PostgreSQL 数据库。

materialized_postgresql_schema

默认值:空字符串。(使用默认模式)

materialized_postgresql_schema_list

默认值:空列表。(使用默认模式)

materialized_postgresql_max_block_size

设置在将数据刷新到 PostgreSQL 数据库表之前内存中收集的行数。

可能的值

  • 正整数。

默认值:65536

materialized_postgresql_replication_slot

用户创建的复制槽。必须与 materialized_postgresql_snapshot 一起使用。

materialized_postgresql_snapshot

一个文本字符串,用于标识将从中执行 PostgreSQL 表的初始转储 的快照。必须与 materialized_postgresql_replication_slot 一起使用。

CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_tables_list = 'table1,table2,table3';

SELECT * FROM database1.table1;

如有必要,可以使用 DDL 查询更改设置。但是无法更改设置 materialized_postgresql_tables_list。要更新此设置中的表列表,请使用 ATTACH TABLE 查询。

ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;

materialized_postgresql_use_unique_replication_consumer_identifier

使用唯一的复制使用者标识符进行复制。默认值:0。如果设置为 1,则允许设置多个指向同一 PostgreSQL 表的 MaterializedPostgreSQL 表。

注意

逻辑复制槽的故障转移

主节点上存在的逻辑复制槽在备用副本上不可用。因此,如果发生故障转移,新的主节点(旧的物理备用节点)将不知道旧主节点上存在的任何槽。这将导致来自 PostgreSQL 的复制中断。解决此问题的方法是自行管理复制槽并定义永久复制槽(一些信息可以在 此处 找到)。您需要通过 materialized_postgresql_replication_slot 设置传递槽名称,并且必须使用 EXPORT SNAPSHOT 选项导出它。快照标识符需要通过 materialized_postgresql_snapshot 设置传递。

请注意,这仅应在实际需要时使用。如果没有真正的需要或完全理解原因,那么最好允许表引擎创建和管理自己的复制槽。

示例(来自 @bchrobot

  1. 在 PostgreSQL 中配置复制槽。

    apiVersion: "acid.zalan.do/v1"
    kind: postgresql
    metadata:
    name: acid-demo-cluster
    spec:
    numberOfInstances: 2
    postgresql:
    parameters:
    wal_level: logical
    patroni:
    slots:
    clickhouse_sync:
    type: logical
    database: demodb
    plugin: pgoutput
  2. 等待复制槽准备就绪,然后开始事务并导出事务快照标识符

    BEGIN;
    SELECT pg_export_snapshot();
  3. 在 ClickHouse 中创建数据库

    CREATE DATABASE demodb
    ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
    SETTINGS
    materialized_postgresql_replication_slot = 'clickhouse_sync',
    materialized_postgresql_snapshot = '0000000A-0000023F-3',
    materialized_postgresql_tables_list = 'table1,table2,table3';
  4. 一旦确认复制到 ClickHouse DB,就结束 PostgreSQL 事务。验证故障转移后复制是否继续

    kubectl exec acid-demo-cluster-0 -c postgres -- su postgres -c 'patronictl failover --candidate acid-demo-cluster-1 --force'

所需权限

  1. CREATE PUBLICATION -- 创建查询权限。

  2. CREATE_REPLICATION_SLOT -- 复制权限。

  3. pg_drop_replication_slot -- 复制权限或超级用户。

  4. DROP PUBLICATION -- 发布的所有者(MaterializedPostgreSQL 引擎本身的 username)。

可以避免执行 23 命令并拥有这些权限。使用设置 materialized_postgresql_replication_slotmaterialized_postgresql_snapshot。但要非常小心。

访问表

  1. pg_publication

  2. pg_replication_slots

  3. pg_publication_tables