简介
借助 ClickHouse,用户可以直接在其 SQL 工作负载中使用 AI 模型。这可以采用在数据插入时丰富数据的形式,或者在查询时补充特定结果。虽然许多用户对训练自己的特定领域模型感到满意,但这对于较小的团队或用例而言通常是不切实际的。在这些情况下,预构建的“即插即用”模型或服务通常就足够了,并且可以用最少的努力提供良好的结果。
在这篇文章中,我们演示了
- 如何使用 ClickHouse 用户定义函数 (UDF) 轻松地将 ClickHouse 与提供“AI 即服务”的第三方 API 集成
- 这些“即插即用”模型如何在 ClickHouse 中直接用于情感分析等任务,并根据这些结果进行聚合,以计算给定主题的正面和负面帖子数量等指标
鉴于 OpenAI 最近的普及和备受瞩目的 ChatGPT 产品,我们以 OpenAI 为例。然而,这种方法的简单性意味着它可以轻松地适应竞争服务。
用户定义函数 (UDF)
ClickHouse 中的 UDF(用户定义函数)有几种形式。在最近的一篇文章中,我们分享了如何使用 ClickHouse SQL 定义的函数来查询 Hugging Face 中外部托管的数据集。虽然像这样的 SQL 定义函数对于概括常见的 SQL 任务非常有用,但有时用户需要他们熟悉的编程语言的全部功能。为此,ClickHouse 支持可执行用户定义函数。这些函数使开发人员可以灵活地调用任何外部可执行程序或脚本来处理数据。在我们下面的简单示例中,我们将使用此功能来调用简单的 Bash 和 Python 脚本,这些脚本将查询 OpenAI API。我们将展示 API 响应如何自动丰富正在插入或被 ClickHouse 查询的数据。
使用 OpenAI
大多数用户通过其流行的 ChatGPT 服务熟悉 OpenAI,该服务已经彻底改变了工作行为和日常任务。OpenAI 为企业提供 REST API,以访问 ChatGPT 模型,用于现有服务和自动化流程。这些服务提供从聊天完成和 嵌入生成 到图像生成和语音转文本的所有功能。在我们的示例中,我们专注于聊天完成,它可以重新用于更通用的任务,例如实体提取和情感标记。
注意:对 OpenAI 服务的所有请求都需要令牌 - 在下面的示例中以环境变量 OPENAI_API_KEY 传递。用户可以注册试用版并获得足够的免费积分用于此处的示例。
除了可以充当聊天机器人外,OpenAI 的完成服务还支持情感分析和结构提取等任务。对于这些任务,开发人员必须通过系统角色向 OpenAI 服务提供相关说明,以描述预期行为。对某些文本执行情感分析的 REST API 请求示例如下。在这里,我们要求服务对论坛帖子进行分类。请注意,我们需要提供明确的说明,仅返回指定情感的单个令牌
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of forum comments."
},
{
"role": "user",
"content": "Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 256
}'
{
"id": "chatcmpl-7vOWWkKWGN7McODMXJzQB6zzDcx0r",
"object": "chat.completion",
"created": 1693913320,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "NEGATIVE"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 147,
"completion_tokens": 2,
"total_tokens": 149
}
}
请注意,我们在此处使用了更具成本效益的 gpt-3.5-turbo
模型,而不是最新的 gpt-4
模型。前者足以用于示例目的。我们将其性能水平留给读者自行确定。
同一服务也可用于提取结构。假设我们希望从上面的文本中提取提及的技术,作为字符串值列表。我们需要稍微修改一下说明
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to extract entities from forum comments"
},
{
"role": "user",
"content": "From the following text extract the many technologies mentioned as a comma seperated list: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 20
}'
{
"id": "chatcmpl-7vOdLnrZWeax3RxjeUNelCTdGvr8q",
"object": "chat.completion",
"created": 1693913743,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "BigQuery, Databricks, Spark, ClickHouse"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 122,
"completion_tokens": 11,
"total_tokens": 133
}
}
关于上述请求参数的一些注意事项
- 我们将
temperature
设置为 0,以消除响应中的任何随机性。对于这些用例,我们不需要创造性的文本 - 只需要确定性的文本分析。 - 在这两种情况下,我们都设置了
max_tokens
来确定响应的长度。一个令牌大约是 ¾ 个单词。因此,我们调整了我们的请求。
数据集
对于我们的示例数据集,我们使用 Hacker News 帖子。此数据集在我们的公共演示环境中可用,包含 2006 年至 2023 年 8 月期间流行的 Hacker News 论坛上的所有帖子和评论:大约 3700 万行。下表显示了表架构。
出于我们的目的,我们对 title
和 text
列感兴趣。我们将探索此数据集作为读者的练习,他们可以按照此处的说明,如果他们希望将此数据集的最新版本加载到自己的 ClickHouse 实例中。或者,我们提供了 S3 上的 Parquet 文件,可以使用 s3 函数加载,如下所示
CREATE TABLE hackernews
(
`id` UInt32,
`deleted` UInt8,
`type` Enum8('story' = 1, 'comment' = 2, 'poll' = 3, 'pollopt' = 4, 'job' = 5),
`by` LowCardinality(String),
`time` DateTime,
`text` String,
`dead` UInt8,
`parent` UInt32,
`poll` UInt32,
`kids` Array(UInt32),
`url` String,
`score` Int32,
`title` String,
`parts` Array(UInt32),
`descendants` Int32
)
ENGINE = MergeTree
ORDER BY id
INSERT INTO hackernews SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/hackernews/2023-08-18.parquet')
添加情感
对于我们的示例,假设我们想将情感分析添加到存储在 ClickHouse 中的 Hacker News 数据中。为此,我们需要通过 ClickHouse UDF 调用早期的 OpenAI REST API。此请求的简单性意味着即使是简单的 bash 脚本也可能足够,如下所示(以下需要 jq)。稍后,我们将演示如何在 Python 中直接执行此操作。
#!/bin/bash
while read read_data; do
sentiment=$(curl -s https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <insert>" \
-d "{
\"model\": \"gpt-3.5-turbo\",
\"messages\": [
{
\"role\": \"system\",
\"content\": \"You are an AI language model trained to analyze and detect the sentiment of forum comments.\"
},
{
\"role\": \"user\",
\"content\": \"Analyze the following Hacker News comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: ${read_data}\"
}
],
\"temperature\": 0,
\"max_tokens\": 2,
\"temperature\": 0
}" | jq -r '.choices[0].message.content')
printf "$sentiment";
done
此脚本应保存在 ClickHouse 的 user_scripts
目录中,命名为 sentiment.sh
并使其可执行。还应将以下条目添加到文件 openai_functions.xml
并保存到 ClickHouse 配置目录(通常为 /etc/clickhouse-server/
)。
<functions>
<function>
<name>sentiment</name>
<type>executable</type>
<format>TabSeparated</format>
<return_type>String</return_type>
<argument>
<type>String</type>
</argument>
<command>sentiment.sh</command>
<command_read_timeout>10000</command_read_timeout>
<command_write_timeout>10000</command_write_timeout>
<max_command_execution_time>10000</max_command_execution_time>
</function>
</functions>
此配置使 UDF 可用于 ClickHouse。除了修改此处的超时以允许 OpenAI 请求的延迟外,我们还提供了一个函数名称 sentiment
,并指定了输入和返回类型。
通过以上配置,用户可以通过简单的函数调用来请求文本片段的情感,例如
SELECT sentiment('Learn about the key differences between ClickHouse Cloud and Snowflake and how ClickHouse Cloud outperforms Snowflake across the critical dimensions for real-time analytics: query latency and and cost.') AS sentiment
┌─sentiment─┐
│ POSITIVE │
└───────────┘
1 row in set. Elapsed: 0.433 sec.
虽然以上内容可以帮助我们入门,但可能需要更强大的解决方案和错误处理。为此,我们可能希望将以上内容转换为 Python。下面的 Python 脚本添加了基本的错误处理和重试以及退避。此处的后者专门用于解决 OpenAI 速率限制的挑战 - 有关更多详细信息,请参阅处理延迟和速率限制。
请注意,需要 openai 和 tenacity 库来处理 API 请求和速率限制。
#!/usr/bin/python3
import sys
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
openai.api_key = "<INSERT>"
request_timeout = 3
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20))
def completion_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
def extract_sentiment(text):
if text == "":
return "NEUTRAL"
messages = [{"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of hackernews forum comments."},
{
"role": "user",
"content": f"Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. "
f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}"
}]
try:
response = completion_with_backoff(model="gpt-3.5-turbo", messages=messages, max_tokens=30, temperature=0, request_timeout=request_timeout)
return response.choices[0].message.content
except:
return "ERROR"
for size in sys.stdin:
# collect a batch for performance
for row in range(0, int(size)):
print(extract_sentiment(sys.stdin.readline().strip()))
sys.stdout.flush()
该服务的基于聊天的性质使得在单个请求中评估多个文本片段的情感具有挑战性。为了使这些示例保持简单,我们为每行发出一个请求。更优化的解决方案可能会批量请求并要求端点评估一组文本。
以上假设从 ClickHouse 传递的任何输入都包含行数的 prefix。这用于确定后续输入的迭代次数。这可以允许在 Python 脚本中批量操作以获得更高的性能。
我们上面函数的配置除了定义唯一的名称 setiment_p
之外,还有一些其他设置。我们将 type
设置为可执行池以提高吞吐量性能。这将启动命令 N 次(下面为 10 次),允许多个同时调用。send_chunk_header
设置确保数字标题指示要处理的行数在任何输入之前出现。我们增加了超时设置,以防传递大量行块。
<functions>
<function>
<name>sentiment_p</name>
<type>executable_pool</type>
<pool_size>10</pool_size>
<send_chunk_header>true</send_chunk_header>
<format>TabSeparated</format>
<return_type>String</return_type>
<argument>
<type>String</type>
</argument>
<command>sentiment.py</command>
<command_read_timeout>10000000</command_read_timeout>
<command_write_timeout>10000000</command_write_timeout>
<max_command_execution_time>1000000</max_command_execution_time>
</function>
</functions>
我们可以将上述任一函数应用于列的一组行。在下面的示例中,我们请求包含单词 ClickHouse 的 10 行标题和文本的情感。
SELECT text, sentiment_p(text) AS sentiment
FROM hackernews WHERE text LIKE '%ClickHouse%' OR title LIKE '%ClickHouse%'
ORDER BY time DESC
LIMIT 2
FORMAT Vertical
Row 1:
──────
text: Yeah ClickHouse is definitely the way to go here. Its ability to serve queries with low latency and high concurrency is in an entirely different league from Snowflake, Redshift, BigQuery, etc.
sentiment: POSITIVE
Row 2:
──────
text: There are other databases today that do real time analytics (ClickHouse, Apache Druid, StarRocks along with Apache Pinot). I'd look at the ClickHouse Benchmark to see who are the competitors in that space and their relative performance.
sentiment: POSITIVE
2 rows in set. Elapsed: 2.763 sec. Processed 37.17 million rows, 13.30 GB (13.46 million rows/s., 4.82 GB/s.)
UDF 仅在最终结果整理完毕后执行 - 这意味着只需要两个请求。这种方法是理想的,因为对 OpenAI 的请求延迟通常远高于 ClickHouse 评估查询的时间。
更进一步,我们可以通过简单的聚合来计算 ClickHouse 的正面和负面帖子数量。这会产生更多开销,因为我们需要进行 1600 多次 OpenAI API 调用。这反映在最终计时中。
SELECT
count(),
sentiment
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
GROUP BY sentiment_p(text) AS sentiment
FORMAT PrettyCompactMonoBlock
┌─count()─┬─sentiment─┐
│ 192 │ NEGATIVE │
│ 628 │ NEUTRAL │
│ 857 │ POSITIVE │
└─────────┴───────────┘
3 rows in set. Elapsed: 203.695 sec. Processed 37.17 million rows, 13.28 GB (182.48 thousand rows/s., 65.21 MB/s.)
处理延迟和速率限制
OpenAI API 的有用性受到两个因素的限制:其延迟和施加的速率限制。请注意,这些变量会因选择的“即插即用”模型而异。在我们的示例中,我们使用 OpenAI。还有许多其他模型可供选择,每种模型都有其自身的权衡。
延迟会影响查询的最小响应时间。虽然 OpenAI 允许多个并发查询以确保这不会影响吞吐量,但速率限制将更具限制性。因此,我们建议用户将这些 API 用于临时分析(其中该函数用于一小部分结果,例如我们之前的 2 行示例)或用于在插入时丰富数据。在展示后者的示例之前,让我们探讨延迟和速率限制的局限性。
我们可以通过修改我们的情感 curl 请求以使用简单格式文件来评估响应的延迟
curl -w "@curl-format.txt" -o /dev/null -s https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are an AI language model trained to analyze and detect the sentiment of forum comments."
},
{
"role": "user",
"content": "Analyze the following hackernews comment and determine if the sentiment is: positive, negative or neutral. Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: I can say for BigQuery and Databricks from personal experience.<p>BigQuery is much slower and is much more expensive for both storage and query.<p>Databricks (Spark) is even slower than that (both io and compute), although you can write custom code/use libs.<p>You seem to underestimate how heavily ClickHouse is optimized (e.g. compressed storage)."
}
],
"temperature": 0,
"max_tokens": 256,
"temperature": 0
}'
time_namelookup: 0.081196s
time_connect: 0.084907s
time_appconnect: 0.095853s
time_pretransfer: 0.095937s
time_redirect: 0.000000s
time_starttransfer: 0.095942s
----------
time_total: 0.650401s
此处的总延迟 0.65 秒限制了我们的查询响应时间,但由于我们有一个命令池(上面为 10 个),ClickHouse 的并行执行管道可以利用该命令池,因此这种情况得到了缓解。但是,这种并行化反过来又受到 OpenAPI 速率限制的限制。
OpenAI 受每分钟请求数和每分钟令牌数限制。对于我们的 gpt-3.5-turbo
模型,这是每分钟 9 万个令牌 (TPM) 和每分钟 3500 个请求 (RPM)。速率限制因模型和帐户类型而异 - 更多详细信息请参见此处。
为了解决这个问题,我们为我们的 UDF 添加了基本的速率限制。API 在标头中返回速率限制信息(即,下一分钟剩余多少令牌和请求)。虽然我们可以开发一个速率限制函数来使用此信息,但 OpenAI 建议使用一些旨在通过指数退避来解决此问题的库。这具有我们无需跨多个线程跟踪请求和令牌使用情况的优势。
我们聚合查询的上述计时 (203.695 秒) 表明,要么我们没有充分利用我们的 10 个 UDF 命令池,要么受到速率限制。假设平均延迟为 0.65*,完全并行化,我们预计我们的总执行时间将接近 100 秒(1600/10 * 0.65 = 104 秒)。
*我们假设 Open AI API 可以保持此延迟,而与诸如可变内容长度(某些评论将比其他评论更长)之类的因素无关。
未能实现 100 秒的性能是因为查询受到 OpenAI API 的速率限制 - 特别是令牌限制。每个 Hacker News 评论平均约为 330 个单词,如下所示,或大约 80 个令牌(每个令牌约 4 个字符)。但是,这不包括我们的提示和系统文本,这增加了额外的 60 个令牌。我们与 ClickHouse 相关的子集的平均令牌长度也更高,为 136。
SELECT
round(avg(length(text))) AS num_chars,
round(num_chars * 0.25) AS num_tokens
FROM hackernews
┌─num_chars─┬─num_tokens─┐
│ 333 │ 83 │
└───────────┴────────────┘
1 row in set. Elapsed: 1.656 sec. Processed 37.17 million rows, 12.72 GB (22.44 million rows/s., 7.68 GB/s.)
SELECT
round(avg(length(text))) AS num_chars,
round(num_chars * 0.25) AS num_tokens
FROM hackernews
WHERE (title LIKE '%ClickHouse%') OR (text LIKE '%ClickHouse%')
┌─num_chars─┬─num_tokens─┐
│ 546 │ 136 │
└───────────┴────────────┘
1 row in set. Elapsed: 1.933 sec. Processed 37.17 million rows, 13.28 GB (19.23 million rows/s., 6.87 GB/s.)
Peak memory usage: 73.49 MiB.
虽然每个评论都需要一个请求,从而导致总共 1600 个请求(低于每分钟 3500 个限制),但我们总共有90 万个字符或 22.9 万个令牌。当考虑我们的提示文本时,这增加到32.9 万个令牌(每个请求额外 60 个)。这远高于每分钟 9 万个的限制。尽管如此,如果这项工作安排得非常完美,我们预计此请求将在我们体验到的 200 秒(329/90 ~ 3.65 分钟 ~ 200 秒)内完成。
虽然更好的速率限制实现(例如,基于通用信元速率算法)可能会更优化地利用 Open AI API 资源,但请求延迟最终将受到我们的令牌限制的限制。我们只能使用前 N 个令牌,其中 N 是根据限制选择的,该限制将确保可以利用每分钟 3500 个的完整请求限制,即 90000/3500 ~25 个令牌。然而,这不太可能足以在我们示例中建立提及的技术或情感。
插入时提取
考虑到速率限制和延迟,使用 API 进行查询的更可取方法是在数据插入时分配情感列。凭借其命令池,Python 函数更适合这种类型的批处理。下面,我们在通过 INSERT INTO
加载行时提取情感。在此示例中,我们将所有与 ClickHouse 相关的行插入到专用表中,为每行计算情感列。这种类型的处理是理想的,因为在插入新行时,Hacker News 数据集每分钟接收大约 8-10 个新行。一旦分配了列,我们就可以在情感列上享受 ClickHouse 查询速度,而无需发出 API 请求。
INSERT INTO hackernews_v2 SELECT
*,
sentiment_p(text) AS sentiment
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
0 rows in set. Elapsed: 185.452 sec. Processed 37.17 million rows, 13.54 GB (200.44 thousand rows/s., 73.00 MB/s.)
SELECT count(), sentiment
FROM hackernews_v2
GROUP BY sentiment
┌─count()─┬─sentiment─┐
│ 193 │ NEGATIVE │
│ 850 │ POSITIVE │
│ 634 │ NEUTRAL │
└─────────┴───────────┘
3 rows in set. Elapsed: 0.003 sec. Processed 1.68 thousand rows, 1.68 KB (531.10 thousand rows/s., 531.10 KB/s.)
Peak memory usage: 72.90 KiB.
提取结构
为了完整起见,我们还将早期的 OpenAI 请求转换为从我们的帖子中提取技术。bash Python 如下所示。
#!/usr/bin/python3
import sys
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
openai.api_key = "<INSERT>"
request_timeout = 3
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20))
def completion_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
def extract_topics(text):
if text == "":
return ""
messages = [{
"role": "system",
"content": "You are an AI language model trained to extract entities from Hacker News forum comments."},
{
"role": "user",
"content": f"From the following text extract the technologies mentioned as a comma separated list with no spaces. Return an empty string if there are no technologies: {text}"
}]
try:
response = completion_with_backoff(model="gpt-3.5-turbo", messages=messages, max_tokens=30, temperature=0,
request_timeout=request_timeout)
return response.choices[0].message.content.strip()
except Exception as e:
return f"ERROR - {e}"
for size in sys.stdin:
# collect a batch for performance
for row in range(0, int(size)):
print(",".join([tech.strip() for tech in extract_topics(sys.stdin.readline().strip()).split(",")]))
sys.stdout.flush()
在使用与情感 UDF 相同的参数配置此函数后,但名称为 extract_techs
,我们可以识别 Hacker News 上提及的与 ClickHouse 相关的最热门技术。
WITH results AS (
SELECT extract_techs(text) as techs
FROM hackernews
WHERE (text LIKE '%ClickHouse%') OR (title LIKE '%ClickHouse%')
)
SELECT
arrayJoin(splitByChar(',', techs)) AS tech,
count() AS c
FROM results
GROUP BY tech
HAVING tech NOT ILIKE '%ClickHouse%' AND tech != ''
ORDER BY c DESC
LIMIT 5
┌─tech────────┬───c─┐
│ Postgres │ 78 │
│ PostgreSQL │ 65 │
│ SQL │ 63 │
│ TimescaleDB │ 54 │
│ MySQL │ 51 │
└─────────────┴─────┘
5 rows in set. Elapsed: 211.358 sec. Processed 37.17 million rows, 13.28 GB (175.87 thousand rows/s., 62.85 MB/s.)
Peak memory usage: 931.95 MiB.
结论
这篇博文展示了如何使用 UDF 将 ClickHouse 直接与模型提供商集成,以丰富现有数据并添加结构。虽然我们的示例使用了 OpenAI,但类似的“即插即用”模型服务应该同样易于集成。