UDF 用户自定义函数
可执行用户自定义函数
ClickHouse 可以调用任何外部可执行程序或脚本处理数据。
可执行用户自定义函数的配置可以位于一个或多个 xml 文件中。配置路径在 user_defined_executable_functions_config 参数中指定。
函数配置包含以下设置
name
- 函数名称。command
- 如果execute_direct
为 false,则执行脚本名称或命令。argument
- 带有type
的参数描述,以及可选的name
参数。每个参数在单独的设置中描述。如果参数名称是用户定义函数格式(如 Native 或 JSONEachRow)序列化的一部分,则指定名称是必需的。默认参数名称值为c
+ argument_number。format
- 命令传递参数的 格式。return_type
- 返回值的类型。return_name
- 返回值的名称。如果返回值名称是用户定义函数格式(如 Native 或 JSONEachRow)序列化的一部分,则指定返回值名称是必需的。可选。默认值为result
。type
- 可执行类型。如果type
设置为executable
,则启动单个命令。如果设置为executable_pool
,则创建一个命令池。max_command_execution_time
- 处理数据块的最大执行时间(以秒为单位)。此设置仅对executable_pool
命令有效。可选。默认值为10
。command_termination_timeout
- 命令管道关闭后命令应完成的时间(以秒为单位)。在此时间后,将向执行命令的进程发送SIGTERM
。可选。默认值为10
。command_read_timeout
- 从命令 stdout 读取数据的超时时间(以毫秒为单位)。默认值为 10000。可选参数。command_write_timeout
- 将数据写入命令 stdin 的超时时间(以毫秒为单位)。默认值为 10000。可选参数。pool_size
- 命令池的大小。可选。默认值为16
。send_chunk_header
- 控制是否在发送数据块以进行处理之前发送行数。可选。默认值为false
。execute_direct
- 如果execute_direct
=1
,则将在 user_scripts_path 指定的 user_scripts 文件夹内搜索command
。可以使用空格分隔符指定其他脚本参数。示例:script_name arg1 arg2
。如果execute_direct
=0
,则command
将作为bin/sh -c
的参数传递。默认值为1
。可选参数。lifetime
- 函数的重新加载间隔(以秒为单位)。如果设置为0
,则不会重新加载函数。默认值为0
。可选参数。
命令必须从 STDIN
读取参数,并将结果输出到 STDOUT
。命令必须迭代处理参数。也就是说,在处理完一批参数后,它必须等待下一批参数。
示例
使用 XML 配置创建 test_function
。文件 test_function.xml
(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml
)。
<functions>
<function>
<type>executable</type>
<name>test_function_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>value</name>
</argument>
<format>TabSeparated</format>
<command>test_function.py</command>
</function>
</functions>
user_scripts
文件夹内的脚本文件 test_function.py
(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function.py
)。
#!/usr/bin/python3
import sys
if __name__ == '__main__':
for line in sys.stdin:
print("Value " + line, end='')
sys.stdout.flush()
查询
SELECT test_function_python(toUInt64(2));
结果
┌─test_function_python(2)─┐
│ Value 2 │
└─────────────────────────┘
手动创建 test_function_sum
,将 execute_direct
指定为 0
,使用 XML 配置。文件 test_function.xml
(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml
)。
<functions>
<function>
<type>executable</type>
<name>test_function_sum</name>
<return_type>UInt64</return_type>
<argument>
<type>UInt64</type>
<name>lhs</name>
</argument>
<argument>
<type>UInt64</type>
<name>rhs</name>
</argument>
<format>TabSeparated</format>
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
<execute_direct>0</execute_direct>
</function>
</functions>
查询
SELECT test_function_sum(2, 2);
结果
┌─test_function_sum(2, 2)─┐
│ 4 │
└─────────────────────────┘
使用 XML 配置创建 test_function_sum_json
,使用命名参数和格式 JSONEachRow。文件 test_function.xml
(使用默认路径设置,为 /etc/clickhouse-server/test_function.xml
)。
<functions>
<function>
<type>executable</type>
<name>test_function_sum_json</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>test_function_sum_json.py</command>
</function>
</functions>
user_scripts
文件夹内的脚本文件 test_function_sum_json.py
(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function_sum_json.py
)。
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()
查询
SELECT test_function_sum_json(2, 2);
结果
┌─test_function_sum_json(2, 2)─┐
│ 4 │
└──────────────────────────────┘
可执行用户自定义函数可以接收在 command
设置中配置的常量参数(仅适用于 executable
类型用户定义函数)。它还需要 execute_direct
选项(以确保没有 shell 参数扩展漏洞)。文件 test_function_parameter_python.xml
(使用默认路径设置,为 /etc/clickhouse-server/test_function_parameter_python.xml
)。
<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_function_parameter_python.py {test_parameter:UInt64}</command>
</function>
</functions>
user_scripts
文件夹内的脚本文件 test_function_parameter_python.py
(使用默认路径设置,为 /var/lib/clickhouse/user_scripts/test_function_parameter_python.py
)。
#!/usr/bin/python3
import sys
if __name__ == "__main__":
for line in sys.stdin:
print("Parameter " + str(sys.argv[1]) + " value " + str(line), end="")
sys.stdout.flush()
查询
SELECT test_function_parameter_python(1)(2);
结果
┌─test_function_parameter_python(1)(2)─┐
│ Parameter 1 value 2 │
└──────────────────────────────────────┘
错误处理
如果数据无效,某些函数可能会抛出异常。在这种情况下,查询将被取消,并将错误文本返回给客户端。对于分布式处理,当某个服务器上出现异常时,其他服务器也会尝试中止查询。
参数表达式的求值
在几乎所有编程语言中,对于某些运算符,可能不会对其中一个参数进行求值。这通常是 &&
、||
和 ?:
运算符。但在 ClickHouse 中,函数(运算符)的参数始终会被求值。这是因为整个列部分一次性求值,而不是单独计算每一行。
执行分布式查询处理的函数
对于分布式查询处理,尽可能多的查询处理阶段在远程服务器上执行,其余阶段(合并中间结果以及所有后续阶段)在请求服务器上执行。
这意味着函数可以在不同的服务器上执行。例如,在查询 SELECT f(sum(g(x))) FROM distributed_table GROUP BY h(y),
中,
- 如果
distributed_table
至少有两个分片,则函数 ‘g’ 和 ‘h’ 在远程服务器上执行,函数 ‘f’ 在请求服务器上执行。 - 如果
distributed_table
只有一个分片,则所有 ‘f’、‘g’ 和 ‘h’ 函数都在此分片所在的服务器上执行。
函数的结果通常不取决于它在哪个服务器上执行。但是,有时这很重要。例如,使用字典的函数使用它们运行所在服务器上的字典。另一个例子是 hostName
函数,它返回它运行所在服务器的名称,以便在 SELECT
查询中按服务器进行 GROUP BY
。
如果查询中的函数在请求服务器上执行,但你需要在远程服务器上执行它,则可以将其包装在一个 ‘any’ 聚合函数中,或将其添加到 GROUP BY
的键中。
SQL 用户自定义函数
可以使用 CREATE FUNCTION 语句创建来自 lambda 表达式的自定义函数。要删除这些函数,请使用 DROP FUNCTION 语句。