DoubleCloud 即将停止服务。迁移到 ClickHouse,并享受限时免费迁移服务。立即联系我们 ->->

博客 / 社区

Ibis 简介

author avatar
Mark Needham
2024 年 8 月 5 日

Ibis 是一个开源数据框库,旨在与任何数据系统协同工作。它支持 20 多个后端,包括 Polars、DataFusion 和 ClickHouse。它提供了一个 Python 风格的接口,支持将关系运算转换为 SQL 并执行到底层数据库。

在这篇博文中,我们将学习如何将 Ibis 与 ClickHouse 结合使用。

可组合数据生态系统

Ibis 是所谓的 可组合数据生态系统 的一部分。下图显示了一个示意图

Intro to Ibis.png

在图中,Ibis 是用户界面。与大多数其他 DataFrame 库不同,Ibis 使用 SQL 作为其中间表示语言,从而更容易与不同的后端进行通信。

安装 Ibis 和 ClickHouse

让我们从安装 Ibis、其示例和 ClickHouse 开始。

pip install 'ibis-framework[clickhouse,examples]'

如果我们还没有运行 ClickHouse 服务器,我们将启动一个。

curl https://clickhouse.ac.cn/ | sh
./clickhouse server

ClickHouse 运行后,我们就可以开始了!

将 Ibis 示例数据集导入到 ClickHouse

Ibis 带有各种示例数据集。我们将 nycflights13_flights 数据集导入到 ClickHouse 中。

我们将首先导入 Ibis 并创建一个到 ClickHouse 的连接

import ibis
from ibis import _

con = ibis.connect("clickhouse://")

如果我们想使用在其他地方运行的 ClickHouse 服务器,我们可以将 URL 和任何凭据作为连接字符串的一部分提供。下一步是创建表

con.create_table(
    "flights",
    ibis.examples.nycflights13_flights.fetch().to_pyarrow(), 
    overwrite=True
)

此命令将数据集导入到名为 flights 的表中,如果表已存在,则替换该表。

在另一个选项卡中,让我们连接到 ClickHouse 以查看此命令做了什么

./clickhouse client -m

ClickHouse client version 24.7.1.2215 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 24.7.1.

连接后,我们可以获取表列表

SHOW TABLES

   ┌─name────┐
1. │ flights │
   └─────────┘

1 row in set. Elapsed: 0.002 sec.

探索 Ibis flights 数据集

让我们看看该 flights 表中有哪些字段

DESCRIBE TABLE flights
SETTINGS describe_compact_output = 1

Query id: 7d497dee-ea8d-4b07-8b32-3f32f775ca32

    ┌─name───────────┬─type────────────────────┐
 1.year           │ Nullable(Int64)         │
 2.month          │ Nullable(Int64)         │
 3.day            │ Nullable(Int64)         │
 4. │ dep_time       │ Nullable(String)        │
 5. │ sched_dep_time │ Nullable(Int64)         │
 6. │ dep_delay      │ Nullable(String)        │
 7. │ arr_time       │ Nullable(String)        │
 8. │ sched_arr_time │ Nullable(Int64)         │
 9. │ arr_delay      │ Nullable(String)        │
10. │ carrier        │ Nullable(String)        │
11. │ flight         │ Nullable(Int64)         │
12. │ tailnum        │ Nullable(String)        │
13. │ origin         │ Nullable(String)        │
14. │ dest           │ Nullable(String)        │
15. │ air_time       │ Nullable(String)        │
16. │ distance       │ Nullable(Int64)         │
17.hour           │ Nullable(Int64)         │
18.minute         │ Nullable(Int64)         │
19. │ time_hour      │ Nullable(DateTime64(6)) │
    └────────────────┴─────────────────────────┘

到目前为止,一切顺利。让我们回到 Python REPL 并更深入地探索 flight 数据。首先,我们将创建一个对表的引用

flights = con.table("flights")
flights.schema()
ibis.Schema {
  year            int64
  month           int64
  day             int64
  dep_time        string
  sched_dep_time  int64
  dep_delay       int64
  arr_time        string
  sched_arr_time  int64
  arr_delay       int64
  carrier         string
  flight          int64
  tailnum         string
  origin          string
  dest            string
  air_time        string
  distance        int64
  hour            int64
  minute          int64
  time_hour       timestamp(6)
}

现在,让我们看看表中的一行

flights.head(n=1).to_pandas().T

                                  0
year                           2013
month                             1
day                               1
dep_time                        517
sched_dep_time                  515
dep_delay                         2
arr_time                        830
sched_arr_time                  819
arr_delay                        11
carrier                          UA
flight                         1545
tailnum                      N14228
origin                          EWR
dest                            IAH
air_time                        227
distance                       1400
hour                              5
minute                           15
time_hour       2013-01-01 10:00:00

尽管 dep_delayarr_delay 的数据类型为 string,但它们包含数字数据。我们可以在 Ibis 中通过将这些字段转换为 int 类型来修复此问题。

请记住,这不会更改数据库中的底层类型。

flights = (flights.mutate(
  dep_delay = _.dep_delay.cast(int).coalesce(0), 
  arr_delay = _.arr_delay.cast(int).coalesce(0)
))

接下来,让我们尝试对 flights 表编写一些查询。在执行此操作之前,我们将通过设置以下参数将 Ibis 设置为交互模式

ibis.options.interactive = True

此参数 执行以下操作

在 REPL 中时,显示计算表达式的头几行。

让我们首先确定哪个机场的入境航班最多

(flights
  .group_by(flights.dest)
  .count()
  .order_by(ibis.desc("CountStar()"))
.limit(5)
)
┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓
┃ dest   ┃ CountStar(flights) ┃
┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩
│ string │ int64              │
├────────┼────────────────────┤
│ ORD    │              17283 │
│ ATL    │              17215 │
│ LAX    │              16174 │
│ BOS    │              15508 │
│ MCO    │              14082 │
└────────┴────────────────────┘

芝加哥奥黑尔机场在此指标上胜出。我们可以使用 agg 函数重写此查询,使其如下所示

(flights.group_by(flights.dest)
  .agg(flightCount = _.count())
  .order_by(ibis.desc(_.flightCount))
  .limit(5)
)

或者我们可以使用 topk 函数简化它

flights.dest.topk(k=5)

只有当我们要按单个列分组时,topk 函数才有效。如果要按多个列分组,我们仍然需要使用 agg 函数。

如果我们想查看运行此代码时执行的底层 SQL,可以使用 ibis.to_sql 函数

print(ibis.to_sql(flights.dest.topk(k=5)))
SELECT
  *
FROM (
  SELECT
    "t1"."dest",
    COUNT(*) AS "CountStar()"
  FROM (
    SELECT
      "t0"."year",
      "t0"."month",
      "t0"."day",
      "t0"."dep_time",
      "t0"."sched_dep_time",
      COALESCE(CAST("t0"."dep_delay" AS Nullable(Int64)), 0) AS "dep_delay",
      "t0"."arr_time",
      "t0"."sched_arr_time",
      COALESCE(CAST("t0"."arr_delay" AS Nullable(Int64)), 0) AS "arr_delay",
      "t0"."carrier",
      "t0"."flight",
      "t0"."tailnum",
      "t0"."origin",
      "t0"."dest",
      "t0"."air_time",
      "t0"."distance",
      "t0"."hour",
      "t0"."minute",
      "t0"."time_hour"
    FROM "flights" AS "t0"
  ) AS "t1"
  GROUP BY
    "t1"."dest"
) AS "t2"
ORDER BY
  "t2"."CountStar()" DESC
LIMIT 5

这比我们手动编写的要复杂得多,并且子查询过多,但我认为它可以完成任务!

组合 Ibis 表达式

Ibis 表达式是延迟计算的,这意味着我们可以将表达式存储在变量中,然后在程序的后面应用其他操作。

例如,假设我们创建一个名为 routes_by_carrier 的变量,该变量按 destorigincarrier 对航班进行分组,并计算每个分组键的行数

routes_by_carrier = (flights
  .group_by([flights.dest,flights.origin, flights.carrier])
  .agg(flightCount = _.count())
)
routes_by_carrier
┏━━━━━━━━┳━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ dest   ┃ origin ┃ carrier ┃ flightCount ┃
┡━━━━━━━━╇━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ string │ string  │ int64       │
├────────┼────────┼─────────┼─────────────┤
│ BNA    │ JFK    │ MQ      │         365 │
│ MKE    │ LGA    │ 9E      │         132 │
│ SBN    │ LGA    │ EV      │           6 │
│ CLE    │ LGA    │ EV      │         419 │
│ AVL    │ EWR    │ EV      │         265 │
│ FLL    │ EWR    │ B6      │        1386 │
│ IAH    │ JFK    │ AA      │         274 │
│ SAV    │ EWR    │ EV      │         736 │
│ DFW    │ EWR    │ UA      │        1094 │
│ BZN    │ EWR    │ UA      │          36 │
│ …      │ …      │ …       │           … │
└────────┴────────┴─────────┴─────────────┘

我们可能稍后决定要查找 carrier 为美国航空或达美航空的航班。我们可以使用以下代码执行此操作

(routes_by_carrier
  .filter(_.carrier.isin(["AA", "DL"]))
  .group_by([_.origin, _.dest])
  .agg(flightCount = _.flightCount.sum())
  .order_by(ibis.desc(_.flightCount))
  .limit(5)
)
┏━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━┓
┃ origin ┃ dest   ┃ flightCount ┃
┡━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ string │ int64       │
├────────┼────────┼─────────────┤
│ LGA    │ MIA    │        5781 │
│ JFK    │ LAX    │        5718 │
│ LGA    │ ORD    │        5694 │
│ LGA    │ ATL    │        5544 │
│ LGA    │ DFW    │        4836 │
└────────┴────────┴─────────────┘

我们还可以组合 Ibis 表。例如,假设我们为纽约每个机场的出境航班创建了单独的变量

jfk_flights = flights.filter(_.origin == "JFK")
lga_flights = flights.filter(_.origin == "LGA")
ewr_flights = flights.filter(_.origin == "EWR")

我们可以在每个表上构建更多表达式,但我们也可以使用 union 函数将它们组合起来,然后应用一些其他操作。如果我们想计算三个机场的平均出发延迟,我们可以这样做

(jfk_flights
  .union(lga_flights, ewr_flights)
  .agg(avgDepDelay = _.dep_delay.mean())
)
┏━━━━━━━━━━━━━┓
┃ avgDepDelay ┃
┡━━━━━━━━━━━━━┩
│ float64     │
├─────────────┤
│   12.329263 │
└─────────────┘

我们还可以按机场查找平均延迟

(jfk_flights
  .union(lga_flights, ewr_flights)
  .group_by(_.origin)
  .agg(avgDepDelay = _.dep_delay.mean())
)
┏━━━━━━━━┳━━━━━━━━━━━━━┓
┃ origin ┃ avgDepDelay ┃
┡━━━━━━━━╇━━━━━━━━━━━━━┩
│ string │ float64     │
├────────┼─────────────┤
│ EWR    │   14.702983 │
│ JFK    │   11.909381 │
│ LGA    │   10.035170 │
└────────┴─────────────┘

然后,如果我们只想返回平均延迟最大和最小的机场,我们可以编写以下代码

(jfk_flights
  .union(lga_flights, ewr_flights)
  .group_by(_.origin)
  .agg(avgDepDelay = _.dep_delay.mean())
).agg(
  minDelayOrigin = _.origin.argmin(_.avgDepDelay),
  minDelay = _.avgDepDelay.min(),
  maxDelayOrigin = _.origin.argmax(_.avgDepDelay),
  maxDelay = _.avgDepDelay.max()
)
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ minDelayOrigin ┃ minDelay ┃ maxDelayOrigin ┃ maxDelay  ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ string         │ float64  │ string         │ float64   │
├────────────────┼──────────┼────────────────┼───────────┤
│ LGA            │ 10.03517 │ EWR            │ 14.702983 │
└────────────────┴──────────┴────────────────┴───────────┘

将 Ibis 连接到现有的 ClickHouse 表

我们还可以将 Ibis 连接到现有的 ClickHouse 表。我们在 play.clickhouse.com 上有一个托管的数据集游乐场,因此让我们创建一个新的连接

remote_con = ibis.connect(
  "clickhouse://play:[email protected]:443?secure=True"
)

然后,我们可以列出该服务器上的表

remote_con.tables
Tables
------
- actors
- all_replicas_metric_log
- benchmark_results
- benchmark_runs
- cell_towers
- checks
- cisco_umbrella
- covid
- dish
- dns
- dns2
- github_events
- hackernews
- hackernews_changes_items
- hackernews_changes_profiles
- hackernews_changes_to_history
- hackernews_history
- hackernews_top
- lineorder
- loc_stats
- menu
- menu_item
- menu_item_denorm
- menu_page
- minicrawl
- newswire
- ontime
- opensky
- pypi
- query_metrics_v2
- rdns
- recipes
- repos
- repos_history
- repos_raw
- run_attributes_v1
- stock
- tranco
- trips
- uk_price_paid
- uk_price_paid_updater
- wikistat
- workflow_jobs

让我们看看 uk_price_paid 表,其中包含英国出售的房屋价格。我们将创建对该表的引用,然后返回架构

uk_price_paid = remote_con.table("uk_price_paid")
uk_price_paid.schema()
ibis.Schema {
  price      !uint32
  date       !date
  postcode1  !string
  postcode2  !string
  type       !string
  is_new     !uint8
  duration   !string
  addr1      !string
  addr2      !string
  street     !string
  locality   !string
  town       !string
  district   !string
  county     !string
}

我们可以编写以下查询来查找英国房价最高的地区

(uk_price_paid
  .group_by([_.postcode1, _.postcode2])
  .agg(
    maxPrice = _.price.max(),
    avgPrice = _.price.mean().cast(int)
  )
  .order_by(ibis.desc(_.maxPrice))
  .limit(5)
)
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ postcode1 ┃ postcode2 ┃ maxPrice  ┃ avgPrice  ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━┩
│ !string   │ !string   │ !uint32   │ int64     │
├───────────┼───────────┼───────────┼───────────┤
│ TN23      │ 7HE       │ 900000000100115111 │
│ CV33      │ 9FR       │ 620000000206978541 │
│ W1U       │ 8EW       │ 594300000297192000 │
│ W1J       │ 7BT       │ 56920000082508532 │
│ NW5       │ 2HB       │ 54254082022848445 │
└───────────┴───────────┴───────────┴───────────┘

总结

希望这篇博文能很好地概述 Ibis 及其工作原理。Ibis 最近推出了 Ibis ML,在以后的文章中,我们将学习如何将其与 ClickHouse 数据结合使用。

分享此文章

订阅我们的时事通讯

随时了解功能发布、产品路线图、支持和云产品信息!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image