简介
使用 ClickHouse,用户可以直接在 SQL 工作负载中利用 AI 模型。这可以采取在插入数据时或在查询时丰富数据的方式,以补充特定结果。虽然许多用户习惯于训练自己的特定领域模型,但这对于较小的团队或用例来说通常是不切实际的。在这些情况下,预构建的“即插即用”模型或服务通常就足够了,并且可以以最小的努力提供良好的结果。
在这篇文章中,我们演示了
- 如何使用 ClickHouse 用户定义函数 (UDF) 将 ClickHouse 轻松地与第三方 API 集成,从而提供“AI 作为服务”
- 如何将这些“即插即用”模型直接用于 ClickHouse 中的任务,例如情感分析,以及针对这些结果进行聚合以计算指标,例如给定主题的正面和负面帖子的数量
鉴于 OpenAI 最近的流行和备受瞩目的 ChatGPT 产品,我们以 OpenAI 为例。但是,这种方法的简单性意味着它可以轻松地适应竞争服务。
用户定义函数 (UDF)
ClickHouse 中的 UDF(用户定义函数)有几种形式。在最近的一篇博文中,我们分享了如何使用 ClickHouse SQL 定义的函数来查询 Hugging Face 中外部托管的数据集。虽然像这样的 SQL 定义的函数对于泛化常见的 SQL 任务非常有用,但有时用户需要他们熟悉的编程语言的全部功能。为此,ClickHouse 支持可执行用户定义函数。这些为开发人员提供了调用任何外部可执行程序或脚本以处理数据的灵活性。在下面的简单示例中,我们将使用此功能来调用简单的 Bash 和 Python 脚本,这些脚本将查询 OpenAI API。我们将展示 API 响应如何自动丰富 ClickHouse 插入或查询的数据。
使用 OpenAI
大多数用户都通过其流行的 ChatGPT 服务了解 OpenAI,该服务已经彻底改变了工作行为和日常任务。OpenAI 为企业提供了一个 REST API,以便在现有服务和自动化流程中访问为 ChatGPT 提供支持的模型。这些服务提供了从聊天完成和嵌入生成到图像生成和语音到文本的一切。
注意:所有对 OpenAI 服务的请求都需要一个令牌 - 在下面的示例中作为环境变量 OPENAI_API_KEY 传递。用户可以注册试用版并获得此处示例所需的足够免费积分。
除了能够充当聊天机器人之外,OpenAI 的完成服务还支持情感分析和结构提取等任务。对于这些任务,开发人员必须通过系统角色为 OpenAI 服务提供相关说明以描述预期行为。执行某些文本的情感分析的示例 REST API 请求可能如下所示。在这里,我们要求服务对论坛帖子进行分类。请注意,我们需要提供明确的说明,以便仅返回指定情感的单个标记
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of forum comments."
},
{
"role": "user",
"content": "Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 256
}'
{
"id": "chatcmpl-7vOWWkKWGN7McODMXJzQB6zzDcx0r",
"object": "chat.completion",
"created": 1693913320,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "NEGATIVE"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 147,
"completion_tokens": 2,
"total_tokens": 149
}
}
请注意,我们在此处使用更经济高效的gpt-3.5-turbo
模型,而不是最新的gpt-4
模型。前者足以满足示例目的。我们留给读者来确定其性能水平。
同一服务也可用于提取结构。假设我们希望从上述文本中提取提到的技术作为字符串值的列表。我们需要稍微修改说明
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to extract entities from forum comments"
},
{
"role": "user",
"content": "From the following text extract the many technologies mentioned as a comma seperated list: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 20
}'
{
"id": "chatcmpl-7vOdLnrZWeax3RxjeUNelCTdGvr8q",
"object": "chat.completion",
"created": 1693913743,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "BigQuery, Databricks, Spark, ClickHouse"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 122,
"completion_tokens": 11,
"total_tokens": 133
}
}
关于上述请求参数的一些说明
- 我们将
temperature
设置为 0 以消除响应中的任何随机性。对于这些用例,我们不需要创意文本 - 仅需要确定性文本分析。 - 在这两种情况下,我们都设置了
max_tokens
来确定响应的长度。一个标记大约是 ¾ 个单词。因此,我们调整了请求。
数据集
对于我们的示例数据集,我们使用 Hacker News 帖子。此数据集(在我们的公共游乐环境中可用)包含从 2006 年到 2023 年 8 月流行的 Hacker News 论坛上的所有帖子和评论:大约 3700 万行。表架构如下所示。
出于我们的目的,我们对title
和text
列感兴趣。我们把探索此数据集作为读者的练习,如果他们希望将此数据集的最新版本加载到他们自己的 ClickHouse 实例中,可以按照此处的说明进行操作。或者,我们在 S3 上提供了一个Parquet 文件,可以使用s3 函数加载,如下所示
CREATE TABLE hackernews
(
`id` UInt32,
`deleted` UInt8,
`type` Enum8('story' = 1, 'comment' = 2, 'poll' = 3, 'pollopt' = 4, 'job' = 5),
`by` LowCardinality(String),
`time` DateTime,
`text` String,
`dead` UInt8,
`parent` UInt32,
`poll` UInt32,
`kids` Array(UInt32),
`url` String,
`score` Int32,
`title` String,
`parts` Array(UInt32),
`descendants` Int32
)
ENGINE = MergeTree
ORDER BY id
INSERT INTO hackernews SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/hackernews/2023-08-18.parquet')
添加情感
对于我们的示例,假设我们希望将情感分析添加到存储在 ClickHouse 中的 Hacker News 数据中。为此,我们需要通过 ClickHouse UDF 调用早期的 OpenAI REST API。此请求的简单性意味着即使是简单的 bash 脚本也可能足够,如下所示(以下需要jq)。稍后,我们将演示如何直接在 Python 中执行此操作。
#!/bin/bash
while read read_data; do
sentiment=$(curl -s https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <insert>" \
-d "{
\"model\": \"gpt-3.5-turbo\",
\"messages\": [
{
\"role\": \"system\",
\"content\": \"You are an AI language model trained to analyze and detect the sentiment of forum comments.\"
},
{
\"role\": \"user\",
\"content\": \"Analyze the following Hacker News comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: ${read_data}\"
}
],
\"temperature\": 0,
\"max_tokens\": 2,
\"temperature\": 0
}" | jq -r '.choices[0].message.content')
printf "$sentiment";
done
此脚本应保存在 ClickHouse 的user_scripts
目录中,命名为sentiment.sh
,并使其可执行。还应将以下条目添加到文件openai_functions.xml
中,并将其保存到 ClickHouse 配置目录(通常为/etc/clickhouse-server/
)。
<functions>
<function>
<name>sentiment</name>
<type>executable</type>
<format>TabSeparated</format>
<return_type>String</return_type>
<argument>
<type>String</type>
</argument>
<command>sentiment.sh</command>
<command_read_timeout>10000</command_read_timeout>
<command_write_timeout>10000</command_write_timeout>
<max_command_execution_time>10000</max_command_execution_time>
</function>
</functions>
此配置使 UDF 可用于 ClickHouse。除了在此处修改超时以允许 OpenAI 请求的延迟外,我们还提供了一个函数名sentiment
,并指定了输入和返回类型。
使用上述配置,用户可以通过简单的函数调用请求文本片段的情感,例如
SELECT sentiment('Learn about the key differences between ClickHouse Cloud and Snowflake and how ClickHouse Cloud outperforms Snowflake across the critical dimensions for real-time analytics: query latency and and cost.') AS sentiment
┌─sentiment─┐
│ POSITIVE │
└───────────┘
1 row in set. Elapsed: 0.433 sec.
虽然以上内容让我们入门,但可能需要一个更强大的包含错误处理的解决方案。为此,我们可能希望将其转换为 Python。下面的 Python 脚本添加了基本的错误处理和带有回退的重试机制。后者的目的是专门解决 OpenAI 速率限制的挑战 - 更多详细信息请参见 处理延迟和速率限制。
请注意,需要使用 openai 和 tenacity 库来处理 API 请求和速率限制。
#!/usr/bin/python3
import sys
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
openai.api_key = "<INSERT>"
request_timeout = 3
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20))
def completion_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
def extract_sentiment(text):
if text == "":
return "NEUTRAL"
messages = [{"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of hackernews forum comments."},
{
"role": "user",
"content": f"Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. "
f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}"
}]
try:
response = completion_with_backoff(model="gpt-3.5-turbo", messages=messages, max_tokens=30, temperature=0, request_timeout=request_timeout)
return response.choices[0].message.content
except:
return "ERROR"
for size in sys.stdin:
# collect a batch for performance
for row in range(0, int(size)):
print(extract_sentiment(sys.stdin.readline().strip()))
sys.stdout.flush()
该服务的基于聊天的特性使得在单个请求中评估多段文本的情感具有挑战性。为了使这些示例保持简单,我们为每一行都发出了一个请求。更优化的解决方案可能会批量请求并要求端点评估一组文本。
以上假设从 ClickHouse 传递的任何输入都包含行数的前缀。这用于确定后续输入迭代的次数。这可以允许 Python 脚本中的操作进行批处理以提高性能。
我们对上述函数的配置除了定义一个唯一的名称 setiment_p
之外,还有一些其他设置。我们将 type
设置为可执行池以提高吞吐量性能。这将启动 N 次命令(下面为 10 次),允许同时调用多次。设置 send_chunk_header
确保在任何输入之前都包含一个指示要处理的行数的数字标题。如果传递了大量行块,我们会增加超时设置。
<functions>
<function>
<name>sentiment_p</name>
<type>executable_pool</type>
<pool_size>10</pool_size>
<send_chunk_header>true</send_chunk_header>
<format>TabSeparated</format>
<return_type>String</return_type>
<argument>
<type>String</type>
</argument>
<command>sentiment.py</command>
<command_read_timeout>10000000</command_read_timeout>
<command_write_timeout>10000000</command_write_timeout>
<max_command_execution_time>1000000</max_command_execution_time>
</function>
</functions>
我们可以将上述任一函数应用于列的一组行。在下面的示例中,我们请求包含单词 ClickHouse 的 10 行标题和文本的情感。
SELECT text, sentiment_p(text) AS sentiment
FROM hackernews WHERE text LIKE '%ClickHouse%' OR title LIKE '%ClickHouse%'
ORDER BY time DESC
LIMIT 2
FORMAT Vertical
Row 1:
──────
text: Yeah ClickHouse is definitely the way to go here. Its ability to serve queries with low latency and high concurrency is in an entirely different league from Snowflake, Redshift, BigQuery, etc.
sentiment: POSITIVE
Row 2:
──────
text: There are other databases today that do real time analytics (ClickHouse, Apache Druid, StarRocks along with Apache Pinot). I'd look at the ClickHouse Benchmark to see who are the competitors in that space and their relative performance.
sentiment: POSITIVE
2 rows in set. Elapsed: 2.763 sec. Processed 37.17 million rows, 13.30 GB (13.46 million rows/s., 4.82 GB/s.)
只有在最终结果被整理后,UDF 才会在此处执行一次 - 这意味着只需要两个请求。这种方法是理想的,因为对 OpenAI 的请求的延迟通常远高于 ClickHouse 评估查询的时间。
更进一步,我们可以通过简单的聚合计算 ClickHouse 的正面和负面帖子的数量。这会产生更多开销,因为我们需要调用 OpenAI API 超过 1600 次。这反映在最终计时中。
SELECT
count(),
sentiment
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
GROUP BY sentiment_p(text) AS sentiment
FORMAT PrettyCompactMonoBlock
┌─count()─┬─sentiment─┐
│ 192 │ NEGATIVE │
│ 628 │ NEUTRAL │
│ 857 │ POSITIVE │
└─────────┴───────────┘
3 rows in set. Elapsed: 203.695 sec. Processed 37.17 million rows, 13.28 GB (182.48 thousand rows/s., 65.21 MB/s.)
处理延迟和速率限制
OpenAI API 的实用性受到两个因素的限制:它的延迟和它施加的速率限制。请注意,这些变量会因所选的“即插即用”模型而异。在我们的示例中,我们使用 OpenAI。有许多其他的选择,每个都有自己的权衡。
延迟会影响查询的最小响应时间。虽然 OpenAI 允许多个并发查询以确保这不会影响吞吐量,但速率限制将更具限制性。因此,我们建议用户将这些 API 用于以下场景:临时分析(其中函数用于结果的小子集,例如我们之前的 2 行示例)或在插入时丰富数据。在展示后者的示例之前,让我们先探索延迟和速率限制的局限性。
我们可以通过修改我们的情感 curl 请求来使用 简单的格式文件 来评估响应的延迟。
curl -w "@curl-format.txt" -o /dev/null -s https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of forum comments."
},
{
"role": "user",
"content": "Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 256,
"temperature": 0
}'
time_namelookup: 0.081196s
time_connect: 0.084907s
time_appconnect: 0.095853s
time_pretransfer: 0.095937s
time_redirect: 0.000000s
time_starttransfer: 0.095942s
----------
time_total: 0.650401s
这里 0.65 秒的总延迟限制了我们的查询响应时间,但由于我们有一个命令池(上面为 10 个),ClickHouse 的并行执行管道可以利用这一点,因此得到了缓解。但是,这种并行化又受到 OpenAPI 速率限制的限制。
OpenAI 同时受到每分钟请求数和每分钟令牌数的限制。对于我们的 gpt-3.5-turbo
模型,这是每分钟 90k 个令牌 (TPM) 和每分钟 3500 个请求 (RPM)。速率限制因模型和帐户类型而异 - 更多详细信息 请点击此处。
为了解决这个问题,我们在 UDF 中添加了基本的速率限制。API 返回速率限制 信息(即下一分钟剩余多少令牌和请求)位于标头中。虽然我们可以开发一个速率限制函数来使用此信息,但 OpenAI 建议使用几个旨在通过 指数退避 解决此问题的库。这具有我们无需跨多个线程跟踪请求和令牌用量的优势。
我们聚合查询的上述计时(203.695 秒)表明,要么我们没有充分利用我们的 10 个 UDF 命令池,要么正在受到速率限制。假设平均延迟为 0.65*,完全并行化,我们预计我们的总执行时间将接近 100 秒(1600/10 * 0.65 = 104 秒)。
*我们假设 Open AI API 可以保持此延迟,而不管内容长度变化等因素(一些评论将比其他评论长)。
无法达到 100 秒的性能,因为查询受到 OpenAI API 上的速率限制 - 特别是令牌限制。如下所示,每个 Hacker News 评论平均约 330 个单词,或约 80 个令牌(约 4 个字符一个令牌)。但这不包括我们的提示和系统文本,它们会额外增加 60 个令牌。我们与 ClickHouse 相关的子集的平均令牌长度也更高,为 136 个。
SELECT
round(avg(length(text))) AS num_chars,
round(num_chars * 0.25) AS num_tokens
FROM hackernews
┌─num_chars─┬─num_tokens─┐
│ 333 │ 83 │
└───────────┴────────────┘
1 row in set. Elapsed: 1.656 sec. Processed 37.17 million rows, 12.72 GB (22.44 million rows/s., 7.68 GB/s.)
SELECT
round(avg(length(text))) AS num_chars,
round(num_chars * 0.25) AS num_tokens
FROM hackernews
WHERE (title LIKE '%ClickHouse%') OR (text LIKE '%ClickHouse%')
┌─num_chars─┬─num_tokens─┐
│ 546 │ 136 │
└───────────┴────────────┘
1 row in set. Elapsed: 1.933 sec. Processed 37.17 million rows, 13.28 GB (19.23 million rows/s., 6.87 GB/s.)
Peak memory usage: 73.49 MiB.
虽然每个评论都需要一个请求,总共 1600 个请求(低于每分钟 3500 个的限制),但我们总共有 900k 个字符或 229k 个令牌。考虑到我们的提示文本,这会增加到 329k 个令牌(每个请求额外 60 个)。这远远超过了每分钟 90k 的限制。尽管如此,如果这项工作安排得完美,我们预计此请求将在我们经历的 200 秒(329/90 ~ 3.65 分钟 ~ 200 秒)内完成。
虽然更好的速率限制实现(例如,基于 通用小区速率算法)可能会更有效地利用 Open AI API 资源,但请求延迟最终将受到我们的令牌限制。我们只能使用前 N 个令牌,其中 N 根据一个限制选择,该限制将确保可以利用 3500/分钟的完整请求限制,即 90000/3500 ~25 个令牌。但是,这对于在我们的示例中建立提到的技术来确定情感来说可能是不够的。
插入时提取
鉴于速率限制和延迟,使用 API 进行查询的更可取的方法是在数据插入时分配一个情感列。凭借其命令池,Python 函数更适合此类批处理。下面,我们在通过 INSERT INTO
加载行时提取情感。在此示例中,我们将所有与 ClickHouse 相关的行插入到一个专用表中,为每个行计算一个情感列。这种类型的处理非常理想,因为新行会不断插入,Hacker News 数据集每分钟接收约 8-10 行新数据。一旦分配了列,我们就可以在情感列上享受 ClickHouse 的查询速度,而无需发出 API 请求。
INSERT INTO hackernews_v2 SELECT
*,
sentiment_p(text) AS sentiment
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
0 rows in set. Elapsed: 185.452 sec. Processed 37.17 million rows, 13.54 GB (200.44 thousand rows/s., 73.00 MB/s.)
SELECT count(), sentiment
FROM hackernews_v2
GROUP BY sentiment
┌─count()─┬─sentiment─┐
│ 193 │ NEGATIVE │
│ 850 │ POSITIVE │
│ 634 │ NEUTRAL │
└─────────┴───────────┘
3 rows in set. Elapsed: 0.003 sec. Processed 1.68 thousand rows, 1.68 KB (531.10 thousand rows/s., 531.10 KB/s.)
Peak memory usage: 72.90 KiB.
提取结构
为了完整起见,让我们还转换我们之前的 OpenAI 请求以从我们的帖子中提取技术。下面的 bash Python 代码显示了这一点。
#!/usr/bin/python3
import sys
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
openai.api_key = "<INSERT>"
request_timeout = 3
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20))
def completion_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
def extract_topics(text):
if text == "":
return ""
messages = [{
"role": "system",
"content": "You are an AI language model trained to extract entities from Hacker News forum comments."},
{
"role": "user",
"content": f"From the following text extract the technologies mentioned as a comma separated list with no spaces. Return an empty string if there are no technologies: {text}"
}]
try:
response = completion_with_backoff(model="gpt-3.5-turbo", messages=messages, max_tokens=30, temperature=0,
request_timeout=request_timeout)
return response.choices[0].message.content.strip()
except Exception as e:
return f"ERROR - {e}"
for size in sys.stdin:
# collect a batch for performance
for row in range(0, int(size)):
print(",".join([tech.strip() for tech in extract_topics(sys.stdin.readline().strip()).split(",")]))
sys.stdout.flush()
在使用与情感 UDF 相同的参数配置此函数后,除了名称为 extract_techs
之外,我们可以识别 Hacker News 上与 ClickHouse 相关的最常提及的技术。
WITH results AS (
SELECT extract_techs(text) as techs
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
)
SELECT
arrayJoin(splitByChar(',', techs)) AS tech,
count() AS c
FROM results
GROUP BY tech
HAVING tech NOT ILIKE '%ClickHouse%' AND tech != ''
ORDER BY c DESC
LIMIT 5
┌─tech────────┬───c─┐
│ Postgres │ 78 │
│ PostgreSQL │ 65 │
│ SQL │ 63 │
│ TimescaleDB │ 54 │
│ MySQL │ 51 │
└─────────────┴─────┘
5 rows in set. Elapsed: 211.358 sec. Processed 37.17 million rows, 13.28 GB (175.87 thousand rows/s., 62.85 MB/s.)
Peak memory usage: 931.95 MiB.
结论
这篇博文展示了如何使用 UDF 将 ClickHouse 直接与模型提供商集成,以丰富和为现有数据添加结构。虽然我们在示例中使用了 OpenAI,但类似的“即插即用”模型服务应该同样易于集成。