跳至主要内容

postgresql

允许对存储在远程 PostgreSQL 服务器上的数据执行 SELECTINSERT 查询。

语法

postgresql({host:port, database, table, user, password[, schema, [, on_conflict]] | named_collection[, option=value [,..]]})

参数

  • host:port — PostgreSQL 服务器地址。
  • database — 远程数据库名称。
  • table — 远程表名称。
  • user — PostgreSQL 用户。
  • password — 用户密码。
  • schema — 非默认表模式。可选。
  • on_conflict — 冲突解决策略。例如:ON CONFLICT DO NOTHING。可选。

也可以使用命名集合传递参数。在这种情况下,应分别指定 hostport。建议在生产环境中使用此方法。

返回值

一个与原始 PostgreSQL 表具有相同列的表对象。

注意

INSERT 查询中,要区分表函数 postgresql(...) 和具有列名称列表的表名称,必须使用关键字 FUNCTIONTABLE FUNCTION。请参阅下面的示例。

实现细节

PostgreSQL 端的 SELECT 查询在只读 PostgreSQL 事务中作为 COPY (SELECT ...) TO STDOUT 运行,并在每个 SELECT 查询后提交。

简单的 WHERE 子句(如 =!=>>=<<=IN)在 PostgreSQL 服务器上执行。

所有联接、聚合、排序、IN [ array ] 条件和 LIMIT 采样约束仅在对 PostgreSQL 的查询完成后才在 ClickHouse 中执行。

PostgreSQL 端的 INSERT 查询在 PostgreSQL 事务中作为 COPY "table_name" (field1, field2, ... fieldN) FROM STDIN 运行,并在每个 INSERT 语句后自动提交。

PostgreSQL 数组类型转换为 ClickHouse 数组。

注意

请注意,在 PostgreSQL 中,像 Integer[] 这样的数组数据类型列可能在不同的行中包含不同维度的数组,但在 ClickHouse 中,仅允许在所有行中都具有相同维度的多维数组。

支持必须用 | 列出的多个副本。例如

SELECT name FROM postgresql(`postgres{1|2|3}:5432`, 'postgres_database', 'postgres_table', 'user', 'password');

SELECT name FROM postgresql(`postgres1:5431|postgres2:5432`, 'postgres_database', 'postgres_table', 'user', 'password');

支持 PostgreSQL 字典源的副本优先级。映射中数字越大,优先级越低。最高优先级为 0

示例

PostgreSQL 中的表

postgres=# CREATE TABLE "public"."test" (
"int_id" SERIAL,
"int_nullable" INT NULL DEFAULT NULL,
"float" FLOAT NOT NULL,
"str" VARCHAR(100) NOT NULL DEFAULT '',
"float_nullable" FLOAT NULL DEFAULT NULL,
PRIMARY KEY (int_id));

CREATE TABLE

postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
INSERT 0 1

postgresql> SELECT * FROM test;
int_id | int_nullable | float | str | float_nullable
--------+--------------+-------+------+----------------
1 | | 2 | test |
(1 row)

使用普通参数从 ClickHouse 中选择数据

SELECT * FROM postgresql('localhost:5432', 'test', 'test', 'postgresql_user', 'password') WHERE str IN ('test');

或使用命名集合

CREATE NAMED COLLECTION mypg AS
host = 'localhost',
port = 5432,
database = 'test',
user = 'postgresql_user',
password = 'password';
SELECT * FROM postgresql(mypg, table='test') WHERE str IN ('test');
┌─int_id─┬─int_nullable─┬─float─┬─str──┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │ 2 │ test │ ᴺᵁᴸᴸ │
└────────┴──────────────┴───────┴──────┴────────────────┘

插入

INSERT INTO TABLE FUNCTION postgresql('localhost:5432', 'test', 'test', 'postgrsql_user', 'password') (int_id, float) VALUES (2, 3);
SELECT * FROM postgresql('localhost:5432', 'test', 'test', 'postgresql_user', 'password');
┌─int_id─┬─int_nullable─┬─float─┬─str──┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │ 2 │ test │ ᴺᵁᴸᴸ │
│ 2 │ ᴺᵁᴸᴸ │ 3 │ │ ᴺᵁᴸᴸ │
└────────┴──────────────┴───────┴──────┴────────────────┘

使用非默认模式

postgres=# CREATE SCHEMA "nice.schema";

postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);

postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
CREATE TABLE pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');

另请参阅

使用 PeerDB 复制或迁移 Postgres 数据

除了表函数之外,您还可以始终使用 ClickHouse 的PeerDB 来设置从 Postgres 到 ClickHouse 的连续数据管道。PeerDB 是一个专门为使用更改数据捕获 (CDC) 从 Postgres 复制数据到 ClickHouse 而设计的工具。