跳至主要内容

UDF 用户自定义函数

可执行用户自定义函数

ClickHouse 可以调用任何外部可执行程序或脚本处理数据。

可执行用户自定义函数的配置可以位于一个或多个 xml 文件中。配置路径在 user_defined_executable_functions_config 参数中指定。

函数配置包含以下设置

  • name - 函数名称。
  • command - 如果 execute_direct 为 false,则执行脚本名称或命令。
  • argument - 带有 type 的参数描述,以及可选的 name 参数。每个参数在单独的设置中描述。如果参数名称是用户定义函数格式(如 NativeJSONEachRow)序列化的一部分,则指定名称是必需的。默认参数名称值为 c + argument_number。
  • format - 命令传递参数的 格式
  • return_type - 返回值的类型。
  • return_name - 返回值的名称。如果返回值名称是用户定义函数格式(如 NativeJSONEachRow)序列化的一部分,则指定返回值名称是必需的。可选。默认值为 result
  • type - 可执行类型。如果 type 设置为 executable,则启动单个命令。如果设置为 executable_pool,则创建一个命令池。
  • max_command_execution_time - 处理数据块的最大执行时间(以秒为单位)。此设置仅对 executable_pool 命令有效。可选。默认值为 10
  • command_termination_timeout - 命令管道关闭后命令应完成的时间(以秒为单位)。在此时间后,将向执行命令的进程发送 SIGTERM。可选。默认值为 10
  • command_read_timeout - 从命令 stdout 读取数据的超时时间(以毫秒为单位)。默认值为 10000。可选参数。
  • command_write_timeout - 将数据写入命令 stdin 的超时时间(以毫秒为单位)。默认值为 10000。可选参数。
  • pool_size - 命令池的大小。可选。默认值为 16
  • send_chunk_header - 控制是否在发送数据块以进行处理之前发送行数。可选。默认值为 false
  • execute_direct - 如果 execute_direct = 1,则将在 user_scripts_path 指定的 user_scripts 文件夹内搜索 command。可以使用空格分隔符指定其他脚本参数。示例:script_name arg1 arg2。如果 execute_direct = 0,则 command 将作为 bin/sh -c 的参数传递。默认值为 1。可选参数。
  • lifetime - 函数的重新加载间隔(以秒为单位)。如果设置为 0,则不会重新加载函数。默认值为 0。可选参数。

命令必须从 STDIN 读取参数,并将结果输出到 STDOUT。命令必须迭代处理参数。也就是说,在处理完一批参数后,它必须等待下一批参数。

示例

使用 XML 配置创建 test_function。文件 test_function.xml(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml)。

<functions>
<function>
<type>executable</type>
<name>test_function_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>value</name>
</argument>
<format>TabSeparated</format>
<command>test_function.py</command>
</function>
</functions>

user_scripts 文件夹内的脚本文件 test_function.py(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function.py)。

#!/usr/bin/python3

import sys

if __name__ == '__main__':
for line in sys.stdin:
print("Value " + line, end='')
sys.stdout.flush()

查询

SELECT test_function_python(toUInt64(2));

结果

┌─test_function_python(2)─┐
│ Value 2 │
└─────────────────────────┘

手动创建 test_function_sum,将 execute_direct 指定为 0,使用 XML 配置。文件 test_function.xml(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml)。

<functions>
<function>
<type>executable</type>
<name>test_function_sum</name>
<return_type>UInt64</return_type>
<argument>
<type>UInt64</type>
<name>lhs</name>
</argument>
<argument>
<type>UInt64</type>
<name>rhs</name>
</argument>
<format>TabSeparated</format>
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
<execute_direct>0</execute_direct>
</function>
</functions>

查询

SELECT test_function_sum(2, 2);

结果

┌─test_function_sum(2, 2)─┐
│ 4 │
└─────────────────────────┘

使用 XML 配置创建 test_function_sum_json,使用命名参数和格式 JSONEachRow。文件 test_function.xml(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml)。

<functions>
<function>
<type>executable</type>
<name>test_function_sum_json</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>test_function_sum_json.py</command>
</function>
</functions>

user_scripts 文件夹内的脚本文件 test_function_sum_json.py(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function_sum_json.py)。

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()

查询

SELECT test_function_sum_json(2, 2);

结果

┌─test_function_sum_json(2, 2)─┐
│ 4 │
└──────────────────────────────┘

可执行用户自定义函数可以接收在 command 设置中配置的常量参数(仅适用于 executable 类型用户定义函数)。它还需要 execute_direct 选项(以确保没有 shell 参数扩展漏洞)。文件 test_function_parameter_python.xml(使用默认路径设置,为 /etc/clickhouse-server/test_function_parameter_python.xml)。

<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_function_parameter_python.py {test_parameter:UInt64}</command>
</function>
</functions>

user_scripts 文件夹内的脚本文件 test_function_parameter_python.py(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function_parameter_python.py)。

#!/usr/bin/python3

import sys

if __name__ == "__main__":
for line in sys.stdin:
print("Parameter " + str(sys.argv[1]) + " value " + str(line), end="")
sys.stdout.flush()

查询

SELECT test_function_parameter_python(1)(2);

结果

┌─test_function_parameter_python(1)(2)─┐
│ Parameter 1 value 2 │
└──────────────────────────────────────┘

错误处理

如果数据无效,某些函数可能会抛出异常。在这种情况下,查询将被取消,并将错误文本返回给客户端。对于分布式处理,当某个服务器上出现异常时,其他服务器也会尝试中止查询。

参数表达式的求值

在几乎所有编程语言中,对于某些运算符,可能不会对其中一个参数进行求值。这通常是 &&||?: 运算符。但在 ClickHouse 中,函数(运算符)的参数始终会被求值。这是因为整个列部分一次性求值,而不是单独计算每一行。

执行分布式查询处理的函数

对于分布式查询处理,尽可能多的查询处理阶段在远程服务器上执行,其余阶段(合并中间结果以及所有后续阶段)在请求服务器上执行。

这意味着函数可以在不同的服务器上执行。例如,在查询 SELECT f(sum(g(x))) FROM distributed_table GROUP BY h(y), 中,

  • 如果 distributed_table 至少有两个分片,则函数 ‘g’ 和 ‘h’ 在远程服务器上执行,函数 ‘f’ 在请求服务器上执行。
  • 如果 distributed_table 只有一个分片,则所有 ‘f’、‘g’ 和 ‘h’ 函数都在此分片所在的服务器上执行。

函数的结果通常不取决于它在哪个服务器上执行。但是,有时这很重要。例如,使用字典的函数使用它们运行所在服务器上的字典。另一个例子是 hostName 函数,它返回它运行所在服务器的名称,以便在 SELECT 查询中按服务器进行 GROUP BY

如果查询中的函数在请求服务器上执行,但你需要在远程服务器上执行它,则可以将其包装在一个 ‘any’ 聚合函数中,或将其添加到 GROUP BY 的键中。

SQL 用户自定义函数

可以使用 CREATE FUNCTION 语句创建来自 lambda 表达式的自定义函数。要删除这些函数,请使用 DROP FUNCTION 语句。

ClickHouse Cloud 中的用户自定义函数