跳至主要内容

为 Python 安装 chDB

需求

macOS 和 Linux (x86_64 和 ARM64) 上的 Python 3.8+

安装

pip install chdb

用法

CLI 示例

python3 -m chdb [SQL] [OutputFormat]
python3 -m chdb "SELECT 1, 'abc'" Pretty

Python 文件示例

import chdb

res = chdb.query("SELECT 1, 'abc'", "CSV")
print(res, end="")

查询可以使用任何 支持的格式 以及 DataframeDebug 返回数据。

GitHub 仓库

您可以在 chdb-io/chdb 找到该项目的 GitHub 仓库。

数据输入

以下方法可用于访问磁盘和内存中的数据格式

查询文件 (Parquet、CSV、JSON、Arrow、ORC 和 60+)

您可以执行 SQL 并返回所需格式的数据。

import chdb
res = chdb.query('select version()', 'Pretty'); print(res)

使用 Parquet 或 CSV

# See more data type format in tests/format_output.py
res = chdb.query('select * from file("data.parquet", Parquet)', 'JSON'); print(res)
res = chdb.query('select * from file("data.csv", CSV)', 'CSV'); print(res)
print(f"SQL read {res.rows_read()} rows, {res.bytes_read()} bytes, elapsed {res.elapsed()} seconds")

Pandas 数据帧输出

# See more in https://clickhouse.ac.cn/docs/en/interfaces/formats
chdb.query('select * from file("data.parquet", Parquet)', 'Dataframe')

查询表 (Pandas 数据帧、Parquet 文件/字节、Arrow 字节)

查询 Pandas 数据帧

import chdb.dataframe as cdf
import pandas as pd
# Join 2 DataFrames
df1 = pd.DataFrame({'a': [1, 2, 3], 'b': ["one", "two", "three"]})
df2 = pd.DataFrame({'c': [1, 2, 3], 'd': ["①", "②", "③"]})
ret_tbl = cdf.query(sql="select * from __tbl1__ t1 join __tbl2__ t2 on t1.a = t2.c",
tbl1=df1, tbl2=df2)
print(ret_tbl)
# Query on the DataFrame Table
print(ret_tbl.query('select b, sum(a) from __table__ group by b'))

使用有状态会话查询

会话将保留查询的状态。所有 DDL 和 DML 状态都将保存在目录中。目录路径可以作为参数传入。如果未传入,将创建一个临时目录。

如果没有指定路径,临时目录将在删除 Session 对象时被删除。否则,路径将被保留。

请注意,默认数据库是 _local,默认引擎是 Memory,这意味着所有数据都将存储在内存中。如果要将数据存储在磁盘上,则应创建另一个数据库。

from chdb import session as chs

## Create DB, Table, View in temp session, auto cleanup when session is deleted.
sess = chs.Session()
sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic")
sess.query("CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x String, y Int) ENGINE = Log;")
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES ('a', 1), ('b', 3), ('c', 2), ('d', 5);")
sess.query(
"CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 4;"
)
print("Select from view:\n")
print(sess.query("SELECT * FROM db_xxx.view_xxx", "Pretty"))

另请参阅:test_stateful.py

使用 Python DB-API 2.0 查询

import chdb.dbapi as dbapi
print("chdb driver version: {0}".format(dbapi.get_client_info()))

conn1 = dbapi.connect()
cur1 = conn1.cursor()
cur1.execute('select version()')
print("description: ", cur1.description)
print("data: ", cur1.fetchone())
cur1.close()
conn1.close()

使用 UDF (用户定义函数) 查询

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf()
def sum_udf(lhs, rhs):
return int(lhs) + int(rhs)

print(query("select sum_udf(12,22)"))

关于 chDB Python UDF (用户定义函数) 装饰器的一些注意事项。

  1. 该函数应该是无状态的。仅支持 UDF,不支持 UDAF (用户定义聚合函数)。

  2. 默认返回类型为 String。如果您想更改返回类型,可以将返回类型作为参数传入。返回类型应为以下类型之一:以下

  3. 该函数应接受 String 类型的参数。由于输入是 TabSeparated,因此所有参数都是字符串。

  4. 该函数将针对每行输入调用。例如

    def sum_udf(lhs, rhs):
    return int(lhs) + int(rhs)

    for line in sys.stdin:
    args = line.strip().split('\t')
    lhs = args[0]
    rhs = args[1]
    print(sum_udf(lhs, rhs))
    sys.stdout.flush()
  5. 该函数应该是一个纯 Python 函数。您应该在 函数内部 导入所有使用的 Python 模块。

    def func_use_json(arg):
    import json
    ...
  6. 使用的 Python 解释器与运行脚本的解释器相同。您可以从 sys.executable 中获取它。

另请参阅:test_udf.py

Python 表引擎

查询 Pandas 数据帧

import chdb
import pandas as pd
df = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)

chdb.query("SELECT b, sum(a) FROM Python(df) GROUP BY b ORDER BY b").show()

查询 Arrow 表

import chdb
import pyarrow as pa
arrow_table = pa.table(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)

chdb.query(
"SELECT b, sum(a) FROM Python(arrow_table) GROUP BY b ORDER BY b", "debug"
).show()

查询 chdb.PyReader 类实例

  1. 您必须从 chdb.PyReader 类继承并实现 read 方法。
  2. read 方法应
    1. 返回一个列表列表,第一个维度是列,第二个维度是行,列顺序应与 read 的第一个参数 col_names 相同。
    2. 当没有更多数据可读时,返回一个空列表。
    3. 应该是状态化的,游标应该在 read 方法中更新。
  3. 可以实现一个可选的 get_schema 方法来返回表的模式。原型是 def get_schema(self) -> List[Tuple[str, str]]:,返回值是一个元组列表,每个元组包含列名和列类型。列类型应为以下类型之一:以下

import chdb

class myReader(chdb.PyReader):
def __init__(self, data):
self.data = data
self.cursor = 0
super().__init__(data)

def read(self, col_names, count):
print("Python func read", col_names, count, self.cursor)
if self.cursor >= len(self.data["a"]):
return []
block = [self.data[col] for col in col_names]
self.cursor += len(block[0])
return block

reader = myReader(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)

chdb.query(
"SELECT b, sum(a) FROM Python(reader) GROUP BY b ORDER BY b"
).show()

另请参阅:test_query_py.py

限制

  1. 支持的列类型:pandas.Series、pyarrow.array、chdb.PyReader
  2. 支持的数据类型:Int、UInt、Float、String、Date、DateTime、Decimal
  3. Python 对象类型将被转换为 String
  4. Pandas 数据帧的性能最好,Arrow 表比 PyReader 更好

有关更多示例,请参见 examplestests