为 Python 安装 chDB
需求
macOS 和 Linux (x86_64 和 ARM64) 上的 Python 3.8+
安装
pip install chdb
用法
CLI 示例
python3 -m chdb [SQL] [OutputFormat]
python3 -m chdb "SELECT 1, 'abc'" Pretty
Python 文件示例
import chdb
res = chdb.query("SELECT 1, 'abc'", "CSV")
print(res, end="")
查询可以使用任何 支持的格式 以及 Dataframe
和 Debug
返回数据。
GitHub 仓库
您可以在 chdb-io/chdb 找到该项目的 GitHub 仓库。
数据输入
以下方法可用于访问磁盘和内存中的数据格式
查询文件 (Parquet、CSV、JSON、Arrow、ORC 和 60+)
您可以执行 SQL 并返回所需格式的数据。
import chdb
res = chdb.query('select version()', 'Pretty'); print(res)
使用 Parquet 或 CSV
# See more data type format in tests/format_output.py
res = chdb.query('select * from file("data.parquet", Parquet)', 'JSON'); print(res)
res = chdb.query('select * from file("data.csv", CSV)', 'CSV'); print(res)
print(f"SQL read {res.rows_read()} rows, {res.bytes_read()} bytes, elapsed {res.elapsed()} seconds")
Pandas 数据帧输出
# See more in https://clickhouse.ac.cn/docs/en/interfaces/formats
chdb.query('select * from file("data.parquet", Parquet)', 'Dataframe')
查询表 (Pandas 数据帧、Parquet 文件/字节、Arrow 字节)
查询 Pandas 数据帧
import chdb.dataframe as cdf
import pandas as pd
# Join 2 DataFrames
df1 = pd.DataFrame({'a': [1, 2, 3], 'b': ["one", "two", "three"]})
df2 = pd.DataFrame({'c': [1, 2, 3], 'd': ["①", "②", "③"]})
ret_tbl = cdf.query(sql="select * from __tbl1__ t1 join __tbl2__ t2 on t1.a = t2.c",
tbl1=df1, tbl2=df2)
print(ret_tbl)
# Query on the DataFrame Table
print(ret_tbl.query('select b, sum(a) from __table__ group by b'))
使用有状态会话查询
会话将保留查询的状态。所有 DDL 和 DML 状态都将保存在目录中。目录路径可以作为参数传入。如果未传入,将创建一个临时目录。
如果没有指定路径,临时目录将在删除 Session 对象时被删除。否则,路径将被保留。
请注意,默认数据库是 _local
,默认引擎是 Memory
,这意味着所有数据都将存储在内存中。如果要将数据存储在磁盘上,则应创建另一个数据库。
from chdb import session as chs
## Create DB, Table, View in temp session, auto cleanup when session is deleted.
sess = chs.Session()
sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic")
sess.query("CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x String, y Int) ENGINE = Log;")
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES ('a', 1), ('b', 3), ('c', 2), ('d', 5);")
sess.query(
"CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 4;"
)
print("Select from view:\n")
print(sess.query("SELECT * FROM db_xxx.view_xxx", "Pretty"))
另请参阅:test_stateful.py。
使用 Python DB-API 2.0 查询
import chdb.dbapi as dbapi
print("chdb driver version: {0}".format(dbapi.get_client_info()))
conn1 = dbapi.connect()
cur1 = conn1.cursor()
cur1.execute('select version()')
print("description: ", cur1.description)
print("data: ", cur1.fetchone())
cur1.close()
conn1.close()
使用 UDF (用户定义函数) 查询
from chdb.udf import chdb_udf
from chdb import query
@chdb_udf()
def sum_udf(lhs, rhs):
return int(lhs) + int(rhs)
print(query("select sum_udf(12,22)"))
关于 chDB Python UDF (用户定义函数) 装饰器的一些注意事项。
该函数应该是无状态的。仅支持 UDF,不支持 UDAF (用户定义聚合函数)。
默认返回类型为 String。如果您想更改返回类型,可以将返回类型作为参数传入。返回类型应为以下类型之一:以下。
该函数应接受 String 类型的参数。由于输入是 TabSeparated,因此所有参数都是字符串。
该函数将针对每行输入调用。例如
def sum_udf(lhs, rhs):
return int(lhs) + int(rhs)
for line in sys.stdin:
args = line.strip().split('\t')
lhs = args[0]
rhs = args[1]
print(sum_udf(lhs, rhs))
sys.stdout.flush()该函数应该是一个纯 Python 函数。您应该在 函数内部 导入所有使用的 Python 模块。
def func_use_json(arg):
import json
...使用的 Python 解释器与运行脚本的解释器相同。您可以从
sys.executable
中获取它。
另请参阅:test_udf.py。
Python 表引擎
查询 Pandas 数据帧
import chdb
import pandas as pd
df = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)
chdb.query("SELECT b, sum(a) FROM Python(df) GROUP BY b ORDER BY b").show()
查询 Arrow 表
import chdb
import pyarrow as pa
arrow_table = pa.table(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)
chdb.query(
"SELECT b, sum(a) FROM Python(arrow_table) GROUP BY b ORDER BY b", "debug"
).show()
查询 chdb.PyReader 类实例
- 您必须从 chdb.PyReader 类继承并实现
read
方法。 read
方法应- 返回一个列表列表,第一个维度是列,第二个维度是行,列顺序应与
read
的第一个参数col_names
相同。 - 当没有更多数据可读时,返回一个空列表。
- 应该是状态化的,游标应该在
read
方法中更新。
- 返回一个列表列表,第一个维度是列,第二个维度是行,列顺序应与
- 可以实现一个可选的
get_schema
方法来返回表的模式。原型是def get_schema(self) -> List[Tuple[str, str]]:
,返回值是一个元组列表,每个元组包含列名和列类型。列类型应为以下类型之一:以下。
import chdb
class myReader(chdb.PyReader):
def __init__(self, data):
self.data = data
self.cursor = 0
super().__init__(data)
def read(self, col_names, count):
print("Python func read", col_names, count, self.cursor)
if self.cursor >= len(self.data["a"]):
return []
block = [self.data[col] for col in col_names]
self.cursor += len(block[0])
return block
reader = myReader(
{
"a": [1, 2, 3, 4, 5, 6],
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
}
)
chdb.query(
"SELECT b, sum(a) FROM Python(reader) GROUP BY b ORDER BY b"
).show()
另请参阅:test_query_py.py。
限制
- 支持的列类型:pandas.Series、pyarrow.array、chdb.PyReader
- 支持的数据类型:Int、UInt、Float、String、Date、DateTime、Decimal
- Python 对象类型将被转换为 String
- Pandas 数据帧的性能最好,Arrow 表比 PyReader 更好