跳至主要内容
跳至主要内容

用户自定义函数 (UDF)

ClickHouse Cloud 中的私有预览版
注意

此功能在 ClickHouse Cloud 中处于私有预览阶段。请联系 ClickHouse 支持团队,网址为 https://clickhouse.cloud/support 以获取访问权限。

ClickHouse 可以调用任何外部可执行程序或脚本来处理数据。

可执行用户自定义函数的配置可以位于一个或多个 xml 文件中。配置文件的路径在 user_defined_executable_functions_config 参数中指定。

函数配置包含以下设置

参数描述必需默认值
name函数名称-
command要执行的脚本名称或命令(如果 execute_direct 为 false)。-
argument带有 type 以及可选 name 的参数描述。每个参数都在单独的设置中描述。如果参数名称是用户自定义函数格式(如 NativeJSONEachRow)序列化的组成部分,则指定名称是必要的。c + argument_number
format一个 格式,用于将参数传递给命令。命令的输出也应使用相同的格式。-
return_type返回值的类型-
return_name返回值的名称。如果返回名称是用户自定义函数格式(如 NativeJSONEachRow)序列化的组成部分,则指定返回名称是必要的。可选result
类型可执行类型。如果 type 设置为 executable,则启动单个命令。如果设置为 executable_pool,则创建命令池。-
max_command_execution_time处理数据块的最大执行时间,以秒为单位。此设置仅对 executable_pool 命令有效。可选10
command_termination_timeout命令管道关闭后,命令应完成的时间,以秒为单位。在此之后,将向执行该命令的进程发送 SIGTERM 信号。可选10
command_read_timeout从命令 stdout 读取数据的超时时间,以毫秒为单位。可选10000
command_write_timeout将数据写入命令 stdin 的超时时间,以毫秒为单位。可选10000
pool_size命令池的大小可选16
send_chunk_header控制在将数据块发送到进程之前是否发送行计数。可选false
execute_direct如果 execute_direct = 1,则在由 user_scripts_path 指定的 user_scripts 文件夹中搜索 command。可以使用空格分隔符指定其他脚本参数。例如:script_name arg1 arg2。如果 execute_direct = 0,则将 command 作为参数传递给 bin/sh -c可选1
lifetime函数重新加载的间隔,以秒为单位。如果设置为 0,则不重新加载该函数。可选0
deterministic如果函数是确定性的(对于相同的输入返回相同的结果)。可选false

命令必须从 STDIN 读取参数,并且必须将结果输出到 STDOUT。命令必须迭代地处理参数。也就是说,在处理完一个数据块的参数后,它必须等待下一个数据块。

可执行用户自定义函数

示例

来自内联脚本的 UDF

使用 XML 或 YAML 配置手动创建 test_function_sum,并将 execute_direct 指定为 0

文件 test_function.xml(使用默认路径设置,位于 /etc/clickhouse-server/test_function.xml)。

<functions>
    <function>
        <type>executable</type>
        <name>test_function_sum</name>
        <return_type>UInt64</return_type>
        <argument>
            <type>UInt64</type>
            <name>lhs</name>
        </argument>
        <argument>
            <type>UInt64</type>
            <name>rhs</name>
        </argument>
        <format>TabSeparated</format>
        <command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
        <execute_direct>0</execute_direct>
        <deterministic>true</deterministic>
    </function>
</functions>

SELECT test_function_sum(2, 2);
┌─test_function_sum(2, 2)─┐
│                       4 │
└─────────────────────────┘

来自 Python 脚本的 UDF

在此示例中,我们创建一个 UDF,它从 STDIN 读取一个值并将其作为字符串返回。

使用 XML 或 YAML 配置创建 test_function

文件 test_function.xml(使用默认路径设置,位于 /etc/clickhouse-server/test_function.xml)。

<functions>
    <function>
        <type>executable</type>
        <name>test_function_python</name>
        <return_type>String</return_type>
        <argument>
            <type>UInt64</type>
            <name>value</name>
        </argument>
        <format>TabSeparated</format>
        <command>test_function.py</command>
    </function>
</functions>

user_scripts 文件夹(使用默认路径设置,位于 /var/lib/clickhouse/user_scripts/test_function.py)中创建一个脚本文件 test_function.py

#!/usr/bin/python3

import sys

if __name__ == '__main__':
    for line in sys.stdin:
        print("Value " + line, end='')
        sys.stdout.flush()
SELECT test_function_python(toUInt64(2));
┌─test_function_python(2)─┐
│ Value 2                 │
└─────────────────────────┘

STDIN 读取两个值,并将它们的和作为 JSON 对象返回

使用 XML 或 YAML 配置创建 test_function_sum_json,并使用命名参数和 JSONEachRow 格式。

文件 test_function.xml(使用默认路径设置,位于 /etc/clickhouse-server/test_function.xml)。

<functions>
    <function>
        <type>executable</type>
        <name>test_function_sum_json</name>
        <return_type>UInt64</return_type>
        <return_name>result_name</return_name>
        <argument>
            <type>UInt64</type>
            <name>argument_1</name>
        </argument>
        <argument>
            <type>UInt64</type>
            <name>argument_2</name>
        </argument>
        <format>JSONEachRow</format>
        <command>test_function_sum_json.py</command>
    </function>
</functions>

user_scripts 文件夹(使用默认路径设置,位于 /var/lib/clickhouse/user_scripts/test_function_sum_json.py)中创建脚本文件 test_function_sum_json.py

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
    for line in sys.stdin:
        value = json.loads(line)
        first_arg = int(value['argument_1'])
        second_arg = int(value['argument_2'])
        result = {'result_name': first_arg + second_arg}
        print(json.dumps(result), end='\n')
        sys.stdout.flush()
SELECT test_function_sum_json(2, 2);
┌─test_function_sum_json(2, 2)─┐
│                            4 │
└──────────────────────────────┘

command 设置中使用参数

可执行用户自定义函数可以在 command 设置中采用常量参数(这仅适用于具有 executable 类型的用户自定义函数)。它还需要 execute_direct 选项,以确保没有 shell 参数扩展漏洞。

文件 test_function_parameter_python.xml(使用默认路径设置,位于 /etc/clickhouse-server/test_function_parameter_python.xml)。

<functions>
    <function>
        <type>executable</type>
        <execute_direct>true</execute_direct>
        <name>test_function_parameter_python</name>
        <return_type>String</return_type>
        <argument>
            <type>UInt64</type>
        </argument>
        <format>TabSeparated</format>
        <command>test_function_parameter_python.py {test_parameter:UInt64}</command>
    </function>
</functions>

user_scripts 文件夹(使用默认路径设置,位于 /var/lib/clickhouse/user_scripts/test_function_parameter_python.py)中创建一个脚本文件 test_function_parameter_python.py

#!/usr/bin/python3

import sys

if __name__ == "__main__":
    for line in sys.stdin:
        print("Parameter " + str(sys.argv[1]) + " value " + str(line), end="")
        sys.stdout.flush()
SELECT test_function_parameter_python(1)(2);
┌─test_function_parameter_python(1)(2)─┐
│ Parameter 1 value 2                  │
└──────────────────────────────────────┘

来自 shell 脚本的 UDF

在此示例中,我们创建一个 shell 脚本,该脚本将每个值乘以 2。

文件 test_function_shell.xml(使用默认路径设置,位于 /etc/clickhouse-server/test_function_shell.xml)。

<functions>
    <function>
        <type>executable</type>
        <name>test_shell</name>
        <return_type>String</return_type>
        <argument>
            <type>UInt8</type>
            <name>value</name>
        </argument>
        <format>TabSeparated</format>
        <command>test_shell.sh</command>
    </function>
</functions>

user_scripts 文件夹(使用默认路径设置,位于 /var/lib/clickhouse/user_scripts/test_shell.sh)中创建一个脚本文件 test_shell.sh

#!/bin/bash

while read read_data;
    do printf "$(expr $read_data \* 2)\n";
done
SELECT test_shell(number) FROM numbers(10);
    ┌─test_shell(number)─┐
 1. │ 0                  │
 2. │ 2                  │
 3. │ 4                  │
 4. │ 6                  │
 5. │ 8                  │
 6. │ 10                 │
 7. │ 12                 │
 8. │ 14                 │
 9. │ 16                 │
10. │ 18                 │
    └────────────────────┘

错误处理

如果数据无效,某些函数可能会引发异常。在这种情况下,查询将被取消,并将错误文本返回给客户端。对于分布式处理,当其中一个服务器发生异常时,其他服务器也会尝试中止查询。

参数表达式的评估

在几乎所有编程语言中,对于某些运算符,可能不会评估其中一个参数。通常是运算符 &&||?:。在 ClickHouse 中,始终评估函数的参数(运算符)。这是因为一次评估整个列的部分,而不是单独计算每一行。

执行分布式查询处理的函数

对于分布式查询处理,尽可能多的查询处理阶段都在远程服务器上执行,其余阶段(合并中间结果以及之后的所有内容)都在请求服务器上执行。

这意味着函数可以在不同的服务器上执行。例如,在查询 SELECT f(sum(g(x))) FROM distributed_table GROUP BY h(y),

  • 如果 distributed_table 至少有两个分片,则函数 'g' 和 'h' 在远程服务器上执行,而函数 'f' 在请求服务器上执行。
  • 如果 distributed_table 只有单个分片,则所有 'f'、'g' 和 'h' 函数都在该分片的服务器上执行。

函数的结果通常不依赖于它在哪个服务器上执行。但是,有时这很重要。例如,使用字典的函数使用它们正在运行的服务器上的字典。另一个示例是 hostName 函数,它返回运行它的服务器的名称,以便在 SELECT 查询中按服务器进行 GROUP BY

如果函数在查询中在请求服务器上执行,但您需要在远程服务器上执行它,则可以将其包装在 'any' 聚合函数中,或将其添加到 GROUP BY 中的键中。

SQL 用户自定义函数

可以使用 CREATE FUNCTION 语句创建来自 lambda 表达式的自定义函数。要删除这些函数,请使用 DROP FUNCTION 语句。

    © . This site is unofficial and not affiliated with ClickHouse, Inc.