引言
这篇文章继续我们关于 ClickHouse 中可用的 Postgres 集成的系列文章。在我们之前的文章中,我们探索了 Postgres 函数和表引擎,展示了用户如何将他们的事务数据从 Postgres 迁移到 ClickHouse 用于分析工作负载。在这篇文章中,我们将展示如何将 Postgres 数据与流行的 ClickHouse 字典功能结合使用,以加速查询,特别是联接。最后,我们将展示如何使用 Postgres 表引擎将分析查询的结果从 ClickHouse 推送回 Postgres。这种“反向 ELT”过程可用于用户需要在最终用户应用程序中显示汇总数据但希望将这些统计数据的繁重计算卸载到 ClickHouse 的情况。
如果您想深入了解这些示例并重现它们,ClickHouse Cloud 是一个很好的起点 - 启动集群并获得 300 美元的免费信用额度,加载数据,让我们处理基础设施,并开始查询!
我们继续在这篇文章的示例中仅使用 ClickHouse Cloud 中的开发实例。对于我们的 Postgres 实例,我们也继续使用Supabase,它提供了足够的免费层级,足以满足我们的示例。我们假设用户已将英国房价数据集加载到 ClickHouse 中,作为之前博客文章中的一个步骤。此数据集也可以在不使用 Postgres 的情况下加载,使用此处概述的步骤。
使用 Postgres 增强字典
正如我们在之前的博客文章中强调的那样,字典可用于加速 ClickHouse 查询,尤其是那些涉及联接的查询。考虑以下示例,我们旨在找到英国(根据 ISO 3166-2)在过去 20 年中经历最大价格变化的地区。请注意,ISO 3166-2 代码不同于邮政编码,表示更大的区域,但更重要的是,它可用于在 Superset 等工具中可视化此数据。
JOIN 需要我们使用邮政编码到区域代码映射列表,该列表可以下载并加载到 codes
表中,如下所示。超过 100 万行,这大约需要一分钟才能加载到我们的 Supabase 免费层级实例中。让我们假设此数据目前仅在 Postgres 中,因此我们将通过 Postgres 联接此数据来回答查询。
注意:我们的邮政编码到 iso 3166-2 代码列表是从房价数据集生成的,并使用我们在 play.clickhouse.com 环境中存在的区域代码列表。此数据集虽然满足我们的需求,但并非完整或详尽,仅涵盖房价数据集中存在的邮政编码。用于生成我们文件的查询可以在此处找到。
wget https://datasets-documentation.s3.amazonaws.com/uk-house-prices/postgres/uk_postcode_to_iso.sql psql -c "CREATE TABLE uk_postcode_to_iso ( id serial, postcode varchar(8) primary key, iso_code char(6) );" psql -c "CREATE INDEX ON uk_postcode_to_iso (iso_code);" psql < uk_postcode_to_iso.sql psql -c "select count(*) from uk_postcode_to_iso;" count --------- 1272836 (1 row) psql -c "\timing" -c "SELECT iso_code, round(avg(((median_2022 - median_2002)/median_2002) * 100)) AS percent_change FROM ( SELECT postcode1 || ' ' || postcode2 AS postcode, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_2002 FROM uk_price_paid WHERE extract(year from date) = '2002' GROUP BY postcode ) med_2002 INNER JOIN ( SELECT postcode1 || ' ' || postcode2 AS postcode, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_2022 FROM uk_price_paid WHERE extract(year from date) = '2022' GROUP BY postcode ) med_2022 ON med_2002.postcode=med_2022.postcode INNER JOIN ( SELECT iso_code, postcode FROM uk_postcode_to_iso ) postcode_to_iso ON med_2022.postcode=postcode_to_iso.postcode GROUP BY iso_code ORDER BY percent_change DESC LIMIT 10;" Timing is on. iso_code | percent_change ----------+---------------- GB-TOF | 403 GB-KEC | 380 GB-MAN | 360 GB-SLF | 330 GB-BGW | 321 GB-HCK | 313 GB-MTY | 306 GB-AGY | 302 GB-RCT | 293 GB-BOL | 292 (10 rows) Time: 48523.927 ms (00:48.524)
此处的查询非常复杂,并且比我们之前文章中的查询更昂贵,该查询仅计算了伦敦变化最大的邮政编码。没有机会利用城镇索引,尽管我们可以利用 EXTRACT(year FROM date
索引,如EXPLAIN 所示。
我们也可以将此 iso 代码数据加载到 ClickHouse 表中并重复联接,在需要的地方调整语法。或者,我们可能会倾向于将映射保留在 Postgres 中,因为它会受到相当频繁的变化的影响。如果在 ClickHouse 中执行联接,则会产生以下查询。注意我们如何使用PostgreSQL 表引擎来创建 uk_postcode_to_iso
,以简化查询语法,而不是使用postgres 函数。
CREATE TABLE uk_postcode_to_iso AS postgresql('db.zcxfcrchxescrtxsnxuc.supabase.co', 'postgres', 'uk_postcode_to_iso', 'postgres', '') SELECT iso_code, round(avg(percent_change)) AS avg_percent_change FROM ( SELECT postcode, medianIf(price, toYear(date) = 2002) AS median_2002, medianIf(price, toYear(date) = 2022) AS median_2022, ((median_2022 - median_2002) / median_2002) * 100 AS percent_change FROM uk_price_paid GROUP BY concat(postcode1, ' ', postcode2) AS postcode HAVING isNaN(percent_change) = 0 ) AS med_by_postcode INNER JOIN uk_postcode_to_iso ON uk_postcode_to_iso.postcode = med_by_postcode.postcode GROUP BY iso_code ORDER BY avg_percent_change DESC LIMIT 10 ┌─iso_code─┬─avg_percent_change─┐ │ GB-TOF │ 403 │ │ GB-KEC │ 380 │ │ GB-MAN │ 360 │ │ GB-SLF │ 330 │ │ GB-BGW │ 321 │ │ GB-HCK │ 313 │ │ GB-MTY │ 306 │ │ GB-AGY │ 302 │ │ GB-RCT │ 293 │ │ GB-BOL │ 292 │ └──────────┴────────────────────┘ 10 rows in set. Elapsed: 4.131 sec. Processed 29.01 million rows, 305.27 MB (7.02 million rows/s., 73.90 MB/s.)
这没有达到我们想要的性能。与其为映射创建 ClickHouse 表,不如我们创建一个以 PostgreSQL 为后端的字典,如下所示
CREATE DICTIONARY uk_postcode_to_iso_dict ( `postcode` String, `iso_code` String ) PRIMARY KEY postcode SOURCE(POSTGRESQL( port 5432 host 'db.ebsmckuuiwnvyiniuvdt.supabase.co' user 'postgres' password '' db 'postgres' table 'uk_postcode_to_iso' invalidate_query 'SELECT max(id) as mid FROM uk_postcode_to_iso' )) LIFETIME(300) LAYOUT(complex_key_hashed()) //force loading of dictionary SELECT dictGet('uk_postcode_to_iso_dict', 'iso_code', 'BA5 1PD') ┌─dictGet('uk_postcode_to_iso_dict', 'iso_code', 'BA5 1PD')─┐ │ GB-SOM │ └───────────────────────────────────────────────────────────┘ 1 row in set. Elapsed: 0.885 sec.
此字典将根据 LIFETIME 子句定期更新,自动同步任何更改。在本例中,我们还定义了一个invalidate_query
子句,该子句通过返回单个值来控制何时需要从源重新加载数据集。如果发生更改,则会重新加载字典 - 在这种情况下,当最大 ID 发生更改时。在生产环境中,我们可能需要一个能够通过修改时间字段检测更新的查询。
使用此字典,我们现在可以修改查询并利用该表保存在内存中的事实,以便进行快速查找。注意我们还可以避免联接
SELECT iso_code, round(avg(percent_change)) AS avg_percent_change FROM ( SELECT dictGet('uk_postcode_to_iso_dict', 'iso_code', postcode) AS iso_code, medianIf(price, toYear(date) = 2002) AS median_2002, medianIf(price, toYear(date) = 2022) AS median_2022, ((median_2022 - median_2002) / median_2002) * 100 AS percent_change FROM uk_price_paid GROUP BY concat(postcode1, ' ', postcode2) AS postcode HAVING isNaN(percent_change) = 0 ) GROUP BY iso_code ORDER BY avg_percent_change DESC LIMIT 10 ┌─iso_code─┬─avg_percent_change─┐ │ GB-TOF │ 403 │ │ GB-KEC │ 380 │ │ GB-MAN │ 360 │ │ GB-SLF │ 330 │ │ GB-BGW │ 321 │ │ GB-HCK │ 313 │ │ GB-MTY │ 306 │ │ GB-AGY │ 302 │ │ GB-RCT │ 293 │ │ GB-BOL │ 292 │ └──────────┴────────────────────┘ 10 rows in set. Elapsed: 0.444 sec. Processed 27.73 million rows, 319.84 MB (62.47 million rows/s., 720.45 MB/s.)
这样更好。对于有兴趣的人来说,此数据可以在 Superset 等工具中绘制,这些工具可以解释这些 iso 代码 - 有关类似示例,请参见我们之前关于 Superset 的博客文章。
将结果推送到 Postgres
到目前为止,我们已经展示了将数据从 Postgres 迁移到 ClickHouse 进行分析工作负载的价值。如果我们将其视为 ETL 过程,则很可能在某个时候,我们将需要反转此工作流并将分析结果加载回 Postgres。我们可以使用我们在本系列文章前面突出显示的相同表引擎来实现这一点。
假设我们要将每月销售额的汇总统计信息推送到 Postgres,按邮政编码、类型、房屋是否为新建房屋以及是否为永久产权或租赁产权进行汇总。我们的假设网站将在每个列表页面上显示这些统计数据,以帮助其用户了解一个地区的市场历史状况。此外,他们希望能够随时间推移显示这些统计数据。为了降低其生产 Postgres 实例的负载*,他们将此计算卸载到 ClickHouse,并定期将这些结果推送到汇总表中。
实际上,这不是一个特别繁重的查询,很可能可以在 Postgres 中进行计划。
下面我们在创建表并插入分析查询结果之前,创建一个以 Postgres 为后端的 ClickHouse 数据库。
CREATE TABLE summary_prices( postcode1 varchar(8), type varchar(13), is_new SMALLINT, duration varchar(9), sold integer, month Date, avg_price integer, quantile_prices integer[]); // create Postgres engine table in ClickHouse CREATE TABLE summary_prices AS postgresql('db.zcxfcrchxescrtxsnxuc.supabase.co', 'postgres', 'summary_prices', 'postgres', '') //check connectivity SELECT count() FROM summary_prices ┌─count()─┐ │ 0 │ └─────────┘ 1 row in set. Elapsed: 0.337 sec. // insert the result of our query to Postgres INSERT INTO summary_prices SELECT postcode1, type, is_new, duration, count() AS sold, month, avg(price) AS avg_price, quantilesExactExclusive(0.25, 0.5, 0.75, 0.9, 0.95, 0.99)(price) AS quantile_prices FROM uk_price_paid WHERE postcode1 != '' GROUP BY toStartOfMonth(date) AS month, postcode1, type, is_new, duration ORDER BY postcode1 ASC, type ASC, is_new ASC, duration ASC, month ASC 0 rows in set. Elapsed: 25.714 sec. Processed 27.69 million rows, 276.98 MB (775.43 thousand rows/s., 7.76 MB/s.)
我们的网站现在有一个简单的查询可以运行,以获取一个区域和相同类型房屋的历史价格统计数据。
postgres=> SELECT postcode1, month, avg_price, quantile_prices FROM summary_prices WHERE postcode1='BA5' AND type='detached' AND is_new=0 and duration='freehold' LIMIT 10; postcode1 | month | avg_price | quantile_prices -----------+------------+-----------+-------------------------------------------- BA5 | 1995-01-01 | 108000 | {64000,100000,160000,160000,160000,160000} BA5 | 1995-02-01 | 95142 | {86500,100000,115000,130000,130000,130000} BA5 | 1995-03-01 | 138991 | {89487,95500,174750,354000,354000,354000} BA5 | 1995-04-01 | 91400 | {63750,69500,130000,165000,165000,165000} BA5 | 1995-05-01 | 110625 | {83500,94500,149750,170000,170000,170000} BA5 | 1995-06-01 | 124583 | {79375,118500,173750,185000,185000,185000} BA5 | 1995-07-01 | 126375 | {88250,95500,185375,272500,272500,272500} BA5 | 1995-08-01 | 104416 | {67500,95000,129750,200000,200000,200000} BA5 | 1995-09-01 | 103000 | {70000,97000,143500,146000,146000,146000} BA5 | 1995-10-01 | 90800 | {58375,72250,111250,213700,223000,223000} (10 rows)
结论
在本系列文章中,我们展示了 ClickHouse 和 Postgres 如何互补,并通过示例演示了如何使用 ClickHouse 原生函数和表引擎在两个数据库之间轻松地移动数据。在本篇文章中,我们介绍了以 Postgres 为后端的字典,以及它如何用于加速涉及频繁更改数据集的查询的联接操作。最后,我们执行了“反向 ETL”操作,将分析查询的结果推送到 Postgres,以便用户界面应用程序使用。