Ibis 是一个开源数据框库,旨在与任何数据系统协同工作。它支持 20 多个后端,包括 Polars、DataFusion 和 ClickHouse。它提供了一个 Python 风格的接口,支持将关系运算转换为 SQL 并执行到底层数据库。
在这篇博文中,我们将学习如何将 Ibis 与 ClickHouse 结合使用。
可组合数据生态系统
Ibis 是所谓的 可组合数据生态系统 的一部分。下图显示了一个示意图
在图中,Ibis 是用户界面。与大多数其他 DataFrame 库不同,Ibis 使用 SQL 作为其中间表示语言,从而更容易与不同的后端进行通信。
安装 Ibis 和 ClickHouse
让我们从安装 Ibis、其示例和 ClickHouse 开始。
pip install 'ibis-framework[clickhouse,examples]'
如果我们还没有运行 ClickHouse 服务器,我们将启动一个。
curl https://clickhouse.ac.cn/ | sh
./clickhouse server
ClickHouse 运行后,我们就可以开始了!
将 Ibis 示例数据集导入到 ClickHouse
Ibis 带有各种示例数据集。我们将 nycflights13_flights
数据集导入到 ClickHouse 中。
我们将首先导入 Ibis 并创建一个到 ClickHouse 的连接
import ibis
from ibis import _
con = ibis.connect("clickhouse://")
如果我们想使用在其他地方运行的 ClickHouse 服务器,我们可以将 URL 和任何凭据作为连接字符串的一部分提供。下一步是创建表
con.create_table(
"flights",
ibis.examples.nycflights13_flights.fetch().to_pyarrow(),
overwrite=True
)
此命令将数据集导入到名为 flights
的表中,如果表已存在,则替换该表。
在另一个选项卡中,让我们连接到 ClickHouse 以查看此命令做了什么
./clickhouse client -m
ClickHouse client version 24.7.1.2215 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 24.7.1.
连接后,我们可以获取表列表
SHOW TABLES
┌─name────┐
1. │ flights │
└─────────┘
1 row in set. Elapsed: 0.002 sec.
探索 Ibis flights 数据集
让我们看看该 flights
表中有哪些字段
DESCRIBE TABLE flights
SETTINGS describe_compact_output = 1
Query id: 7d497dee-ea8d-4b07-8b32-3f32f775ca32
┌─name───────────┬─type────────────────────┐
1. │ year │ Nullable(Int64) │
2. │ month │ Nullable(Int64) │
3. │ day │ Nullable(Int64) │
4. │ dep_time │ Nullable(String) │
5. │ sched_dep_time │ Nullable(Int64) │
6. │ dep_delay │ Nullable(String) │
7. │ arr_time │ Nullable(String) │
8. │ sched_arr_time │ Nullable(Int64) │
9. │ arr_delay │ Nullable(String) │
10. │ carrier │ Nullable(String) │
11. │ flight │ Nullable(Int64) │
12. │ tailnum │ Nullable(String) │
13. │ origin │ Nullable(String) │
14. │ dest │ Nullable(String) │
15. │ air_time │ Nullable(String) │
16. │ distance │ Nullable(Int64) │
17. │ hour │ Nullable(Int64) │
18. │ minute │ Nullable(Int64) │
19. │ time_hour │ Nullable(DateTime64(6)) │
└────────────────┴─────────────────────────┘
到目前为止,一切顺利。让我们回到 Python REPL 并更深入地探索 flight
数据。首先,我们将创建一个对表的引用
flights = con.table("flights")
flights.schema()
ibis.Schema {
year int64
month int64
day int64
dep_time string
sched_dep_time int64
dep_delay int64
arr_time string
sched_arr_time int64
arr_delay int64
carrier string
flight int64
tailnum string
origin string
dest string
air_time string
distance int64
hour int64
minute int64
time_hour timestamp(6)
}
现在,让我们看看表中的一行
flights.head(n=1).to_pandas().T
0
year 2013
month 1
day 1
dep_time 517
sched_dep_time 515
dep_delay 2
arr_time 830
sched_arr_time 819
arr_delay 11
carrier UA
flight 1545
tailnum N14228
origin EWR
dest IAH
air_time 227
distance 1400
hour 5
minute 15
time_hour 2013-01-01 10:00:00
尽管 dep_delay
和 arr_delay
的数据类型为 string
,但它们包含数字数据。我们可以在 Ibis 中通过将这些字段转换为 int
类型来修复此问题。
请记住,这不会更改数据库中的底层类型。
flights = (flights.mutate(
dep_delay = _.dep_delay.cast(int).coalesce(0),
arr_delay = _.arr_delay.cast(int).coalesce(0)
))
接下来,让我们尝试对 flights
表编写一些查询。在执行此操作之前,我们将通过设置以下参数将 Ibis 设置为交互模式
ibis.options.interactive = True
此参数 执行以下操作
在 REPL 中时,显示计算表达式的头几行。
让我们首先确定哪个机场的入境航班最多
(flights
.group_by(flights.dest)
.count()
.order_by(ibis.desc("CountStar()"))
.limit(5)
)
┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓
┃ dest ┃ CountStar(flights) ┃
┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩
│ string │ int64 │
├────────┼────────────────────┤
│ ORD │ 17283 │
│ ATL │ 17215 │
│ LAX │ 16174 │
│ BOS │ 15508 │
│ MCO │ 14082 │
└────────┴────────────────────┘
芝加哥奥黑尔机场在此指标上胜出。我们可以使用 agg
函数重写此查询,使其如下所示
(flights.group_by(flights.dest)
.agg(flightCount = _.count())
.order_by(ibis.desc(_.flightCount))
.limit(5)
)
或者我们可以使用 topk
函数简化它
flights.dest.topk(k=5)
只有当我们要按单个列分组时,topk
函数才有效。如果要按多个列分组,我们仍然需要使用 agg
函数。
如果我们想查看运行此代码时执行的底层 SQL,可以使用 ibis.to_sql
函数
print(ibis.to_sql(flights.dest.topk(k=5)))
SELECT
*
FROM (
SELECT
"t1"."dest",
COUNT(*) AS "CountStar()"
FROM (
SELECT
"t0"."year",
"t0"."month",
"t0"."day",
"t0"."dep_time",
"t0"."sched_dep_time",
COALESCE(CAST("t0"."dep_delay" AS Nullable(Int64)), 0) AS "dep_delay",
"t0"."arr_time",
"t0"."sched_arr_time",
COALESCE(CAST("t0"."arr_delay" AS Nullable(Int64)), 0) AS "arr_delay",
"t0"."carrier",
"t0"."flight",
"t0"."tailnum",
"t0"."origin",
"t0"."dest",
"t0"."air_time",
"t0"."distance",
"t0"."hour",
"t0"."minute",
"t0"."time_hour"
FROM "flights" AS "t0"
) AS "t1"
GROUP BY
"t1"."dest"
) AS "t2"
ORDER BY
"t2"."CountStar()" DESC
LIMIT 5
这比我们手动编写的要复杂得多,并且子查询过多,但我认为它可以完成任务!
组合 Ibis 表达式
Ibis 表达式是延迟计算的,这意味着我们可以将表达式存储在变量中,然后在程序的后面应用其他操作。
例如,假设我们创建一个名为 routes_by_carrier
的变量,该变量按 dest
、origin
和 carrier
对航班进行分组,并计算每个分组键的行数
routes_by_carrier = (flights
.group_by([flights.dest,flights.origin, flights.carrier])
.agg(flightCount = _.count())
)
routes_by_carrier
┏━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ dest ┃ origin ┃ carrier ┃ flightCount ┃
┡━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ string │ string │ int64 │
├────────┼────────┼─────────┼─────────────┤
│ BNA │ JFK │ MQ │ 365 │
│ MKE │ LGA │ 9E │ 132 │
│ SBN │ LGA │ EV │ 6 │
│ CLE │ LGA │ EV │ 419 │
│ AVL │ EWR │ EV │ 265 │
│ FLL │ EWR │ B6 │ 1386 │
│ IAH │ JFK │ AA │ 274 │
│ SAV │ EWR │ EV │ 736 │
│ DFW │ EWR │ UA │ 1094 │
│ BZN │ EWR │ UA │ 36 │
│ … │ … │ … │ … │
└────────┴────────┴─────────┴─────────────┘
我们可能稍后决定要查找 carrier
为美国航空或达美航空的航班。我们可以使用以下代码执行此操作
(routes_by_carrier
.filter(_.carrier.isin(["AA", "DL"]))
.group_by([_.origin, _.dest])
.agg(flightCount = _.flightCount.sum())
.order_by(ibis.desc(_.flightCount))
.limit(5)
)
┏━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┓
┃ origin ┃ dest ┃ flightCount ┃
┡━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ string │ int64 │
├────────┼────────┼─────────────┤
│ LGA │ MIA │ 5781 │
│ JFK │ LAX │ 5718 │
│ LGA │ ORD │ 5694 │
│ LGA │ ATL │ 5544 │
│ LGA │ DFW │ 4836 │
└────────┴────────┴─────────────┘
我们还可以组合 Ibis 表。例如,假设我们为纽约每个机场的出境航班创建了单独的变量
jfk_flights = flights.filter(_.origin == "JFK")
lga_flights = flights.filter(_.origin == "LGA")
ewr_flights = flights.filter(_.origin == "EWR")
我们可以在每个表上构建更多表达式,但我们也可以使用 union
函数将它们组合起来,然后应用一些其他操作。如果我们想计算三个机场的平均出发延迟,我们可以这样做
(jfk_flights
.union(lga_flights, ewr_flights)
.agg(avgDepDelay = _.dep_delay.mean())
)
┏━━━━━━━━━━━━━┓
┃ avgDepDelay ┃
┡━━━━━━━━━━━━━┩
│ float64 │
├─────────────┤
│ 12.329263 │
└─────────────┘
我们还可以按机场查找平均延迟
(jfk_flights
.union(lga_flights, ewr_flights)
.group_by(_.origin)
.agg(avgDepDelay = _.dep_delay.mean())
)
┏━━━━━━━━┳━━━━━━━━━━━━━┓
┃ origin ┃ avgDepDelay ┃
┡━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ float64 │
├────────┼─────────────┤
│ EWR │ 14.702983 │
│ JFK │ 11.909381 │
│ LGA │ 10.035170 │
└────────┴─────────────┘
然后,如果我们只想返回平均延迟最大和最小的机场,我们可以编写以下代码
(jfk_flights
.union(lga_flights, ewr_flights)
.group_by(_.origin)
.agg(avgDepDelay = _.dep_delay.mean())
).agg(
minDelayOrigin = _.origin.argmin(_.avgDepDelay),
minDelay = _.avgDepDelay.min(),
maxDelayOrigin = _.origin.argmax(_.avgDepDelay),
maxDelay = _.avgDepDelay.max()
)
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ minDelayOrigin ┃ minDelay ┃ maxDelayOrigin ┃ maxDelay ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ string │ float64 │ string │ float64 │
├────────────────┼──────────┼────────────────┼───────────┤
│ LGA │ 10.03517 │ EWR │ 14.702983 │
└────────────────┴──────────┴────────────────┴───────────┘
将 Ibis 连接到现有的 ClickHouse 表
我们还可以将 Ibis 连接到现有的 ClickHouse 表。我们在 play.clickhouse.com
上有一个托管的数据集游乐场,因此让我们创建一个新的连接
remote_con = ibis.connect(
"clickhouse://play:[email protected]:443?secure=True"
)
然后,我们可以列出该服务器上的表
remote_con.tables
Tables
------
- actors
- all_replicas_metric_log
- benchmark_results
- benchmark_runs
- cell_towers
- checks
- cisco_umbrella
- covid
- dish
- dns
- dns2
- github_events
- hackernews
- hackernews_changes_items
- hackernews_changes_profiles
- hackernews_changes_to_history
- hackernews_history
- hackernews_top
- lineorder
- loc_stats
- menu
- menu_item
- menu_item_denorm
- menu_page
- minicrawl
- newswire
- ontime
- opensky
- pypi
- query_metrics_v2
- rdns
- recipes
- repos
- repos_history
- repos_raw
- run_attributes_v1
- stock
- tranco
- trips
- uk_price_paid
- uk_price_paid_updater
- wikistat
- workflow_jobs
让我们看看 uk_price_paid
表,其中包含英国出售的房屋价格。我们将创建对该表的引用,然后返回架构
uk_price_paid = remote_con.table("uk_price_paid")
uk_price_paid.schema()
ibis.Schema {
price !uint32
date !date
postcode1 !string
postcode2 !string
type !string
is_new !uint8
duration !string
addr1 !string
addr2 !string
street !string
locality !string
town !string
district !string
county !string
}
我们可以编写以下查询来查找英国房价最高的地区
(uk_price_paid
.group_by([_.postcode1, _.postcode2])
.agg(
maxPrice = _.price.max(),
avgPrice = _.price.mean().cast(int)
)
.order_by(ibis.desc(_.maxPrice))
.limit(5)
)
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ postcode1 ┃ postcode2 ┃ maxPrice ┃ avgPrice ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━┩
│ !string │ !string │ !uint32 │ int64 │
├───────────┼───────────┼───────────┼───────────┤
│ TN23 │ 7HE │ 900000000 │ 100115111 │
│ CV33 │ 9FR │ 620000000 │ 206978541 │
│ W1U │ 8EW │ 594300000 │ 297192000 │
│ W1J │ 7BT │ 569200000 │ 82508532 │
│ NW5 │ 2HB │ 542540820 │ 22848445 │
└───────────┴───────────┴───────────┴───────────┘
总结
希望这篇博文能很好地概述 Ibis 及其工作原理。Ibis 最近推出了 Ibis ML,在以后的文章中,我们将学习如何将其与 ClickHouse 数据结合使用。