本文介绍如何使用 Java 语言编写用户定义函数(User Defined Function,UDF)。
自 2.2.0 版本起,StarRocks 支持使用 Java 语言编写用户定义函数 UDF。您可以根据业务场景开发自定义函数,扩展 StarRocks 的函数能力。本文介绍 StarRocks 支持的 UDF 类型,开发流程和使用方式。
目前 StarRocks 支持的 UDF 包括用户自定义标量函数(Scalar UDF)、用户自定义聚合函数(User Defined Aggregation Function,UDAF)、用户自定义窗口函数(User Defined Window Function,UDWF)、用户自定义表格函数(User Defined Table Function,UDTF)。
如需使用 StarRocks 的 Java UDF 功能,您需要安装 Apache Maven 以创建并编写相关 Java 项目。
使用 StarRocks 的 Java UDF 功能前,您需要在 FE 配置文件 fe/conf/fe.conf 中设置配置项 enable_udf
为 true
以开启 UDF 功能,并重启 FE 节点使配置项生效。
详细操作以及配置项列表参考 配置参数。
您需要创建 Maven 项目并使用 Java 语言编写相应功能。
-
创建 Maven 项目,项目的基本目录结构如下:
project |--pom.xml |--src | |--main | | |--java | | |--resources | |--test |--target
在 pom.xml 中添加如下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
您需要使用 Java 语言开发相应 UDF。
Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用 Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括 UPPER
、LOWER
、ROUND
、ABS
。
以下示例以提取 JSON 数据功能为例进行说明。例如,业务场景中,JSON 数据中某个字段的值可能是 JSON 字符串而不是 JSON 对象,因此在提取 JSON 字符串时,SQL 语句需要嵌套调用 GET_JSON_STRING
,即 GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")
。
为简化 SQL 语句,您可以开发一个 UDF,直接提取 JSON 字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")
。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// JSONPath 库可以全部展开,即使某个字段的值是 JSON 格式的字符串
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}
用户自定义类必须实现如下方法:
说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
---|---|
TYPE1 evaluate(TYPE2, ...) | evaluate 方法为 UDF 调用入口,必须是 public 成员方法。 |
UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括 SUM
、COUNT
、MAX
、MIN
,这些函数对于每个 GROUP BY 分组中多行数据进行聚合后,只输出一行结果。
以下示例以 MY_SUM_INT
函数为例进行说明。与内置函数 SUM
(返回值为 BIGINT 类型)区别在于,MY_SUM_INT
函数支持传入参数和返回参数的类型为 INT。
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
用户自定义类必须实现如下方法:
说明 方法中传入参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。 | 方法 | 含义 | | --------------------------------- | ------------------------------------------------------------ | | State create() | 创建 State。 | | void destroy(State) | 销毁 State。 | | void update(State, ...) | 更新 State 。其中第一个参数是 State,其余的参数是函数声明的输入参数,可以为 1 个或多个。 | | void serialize(State, ByteBuffer) | 序列化 State。 | | void merge(State, ByteBuffer) | 合并 State 和反序列化 State。 | | TYPE finalize(State) | 通过 State 获取函数的最终结果。 |
并且,开发 UDAF 函数时,您需要使用缓冲区类 java.nio.ByteBuffer
和局部变量 serializeLength
,用于保存和表示中间结果,指定中间结果的序列化长度。
类和局部变量 | 说明 |
---|---|
java.nio.ByteBuffer() | 缓冲区类,用于保存中间结果。 并且,由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用 serializeLength 指定中间结果序列化后的长度。 |
serializeLength() | 中间结果序列化后的长度,单位为 Byte。 serializeLength 的数据类型固定为 INT。 例如,示例中 State { int counter = 0; public int serializeLength() { return 4; }} 包含对中间结果序列化后的说明,即,中间结果的数据类型为 INT,序列化长度为 4 Byte。您也可以按照业务需求进行调整,例如中间结果序列化后的数据类型 LONG,序列化长度为 8 Byte,则需要传入 State { long counter = 0; public int serializeLength() { return 8; }} 。 |
注意
java.nio.ByteBuffer
序列化相关事项:
- 不支持依赖 ByteBuffer 的
remaining
方法来反序列化 State。- 不支持对 ByteBuffer 调用
clear
方法。serializeLength
需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。
UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含 OVER
子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。
以下示例以 MY_WINDOW_SUM_INT
函数为例进行说明。与内置函数 SUM
(返回类型为 BIGINT)区别在于,MY_WINDOW_SUM_INT
函数支持传入参数和返回参数的类型为 INT。
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}
用户自定义类必须实现 UDAF 所需要的方法(窗口函数是特殊聚合函数),以及方法 windowUpdate()
。
说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
需要额外实现的方法 | 方法的含义 |
void windowUpdate(State state, int, int, int , int, ...) | 更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。
|
UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。
说明 目前 UDTF 只支持返回多行单列。
以下示例以 MY_UDF_SPLIT
函数为例进行说明。MY_UDF_SPLIT
函数支持分隔符为空格,传入参数和返回参数的类型为 STRING。
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}
用户自定义类必须实现如下方法:
说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
---|---|
TYPE[] process() | process() 方法为 UDTF 调用入口,需要返回数组。 |
通过以下命令打包 Java 项目。
mvn package
target 目录下会生成两个文件:udf-1.0-SNAPSHOT.jar 和 udf-1.0-SNAPSHOT-jar-with-dependencies.jar。
将文件 udf-1.0-SNAPSHOT-jar-with-dependencies.jar 上传至 FE 和 BE 能访问的 HTTP 服务器,并且 HTTP 服务需要一直开启。
mvn deploy
您可以通过 Python 创建一个简易的 HTTP 服务器,并将文件上传至该服务器中。详细步骤参考 通过 Python 搭建简易 http 服务器。
说明 步骤六中, FE 会对 UDF 所在 Jar 包进行校验并计算校验值,BE 会下载 UDF 所在 Jar 包并执行。
上传完成后,您需要在 StarRocks 中创建相对应的 UDF。
执行如下命令,在 StarRocks 中创建先前示例中的 Scalar UDF。
CREATE FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
properties (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
---|---|
symbol | UDF 所在项目的类名。格式为<package_name>.<class_name> 。 |
type | 用于标记所创建的 UDF 类型。取值为 StarrocksJar ,表示基于 Java 的 UDF。 |
file | UDF 所在 Jar 包的 HTTP 路径。格式为http://<http_server_ip>:<http_server_port>/<jar_package_name> 。 |
执行如下命令,在 StarRocks 中创建先前示例中的 UDAF。
CREATE AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
---|---|
symbol | UDF 所在项目的类名。格式为<package_name>.<class_name> 。 |
type | 用于标记所创建的 UDF 类型。取值为 StarrocksJar ,表示基于 Java 的 UDF。 |
file | UDF 所在 Jar 包的 HTTP 路径。格式为http://<http_server_ip>:<http_server_port>/<jar_package_name> 。 |
执行如下命令,在 StarRocks 中创建先前示例中的 UDWF。
CREATE AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
---|---|
Analytic | 所创建的函数是否为窗口函数,固定取值为 true 。 |
symbol | UDF 所在项目的类名。格式为 <package_name>.<class_name> 。 |
type | 用于标记所创建的 UDF 类型。取值为 StarrocksJar ,表示基于 Java 的 UDF。 |
file | UDF 所在 Jar 包的 HTTP 路径。格式为 http://<http_server_ip>:<http_server_port>/<jar_package_name> 。 |
执行如下命令,在 StarRocks 中创建先前示例中的 UDTF。
CREATE TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
---|---|
symbol | UDF 所在项目的类名。格式为<package_name>.<class_name> 。 |
type | 用于标记所创建的 UDF 类型。取值为 StarrocksJar ,表示基于 Java 的 UDF。 |
file | UDF 所在 Jar 包的 HTTP 路径。格式为http://<http_server_ip>:<http_server_port>/<jar_package_name> 。 |
创建完成后,您可以测试使用您开发的 UDF。
执行如下命令,使用先前示例中的 Scalar UDF 函数。
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');
执行如下命令,使用先前示例中的 UDAF。
SELECT MY_SUM_INT(col1);
执行如下命令,使用先前示例中的 UDWF。
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;
执行如下命令,使用先前示例中的 UDTF。
-- 假设存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."
-- 使用 MY_UDF_SPLIT() 函数。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
说明
- 第一个
MY_UDF_SPLIT
为调用MY_UDF_SPLIT
后生成的列别名。- 暂不支持使用
AS t2(f1)
的方式指定表格函数返回表的表别名和列别名。
运行以下命令查看 UDF 信息。
SHOW FUNCTIONS;
更多信息,请参见SHOW FUNCTIONS。
运行以下命令删除指定的 UDF。
DROP FUNCTION <function_name>(arg_type [, ...]);
更多信息,请参见DROP FUNCTION。
SQL TYPE | Java TYPE |
---|---|
BOOLEAN | java.lang.Boolean |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
STRING/VARCHAR | java.lang.String |
JVM 参数配置:在 be/conf/hadoop_env.sh 中配置如下环境变量,可以控制内存使用。
export LIBHDFS_OPTS="-Xloggc:$STARROCKS_HOME/log/be.gc.log -server"
Q:开发 UDF 时是否可以使用静态变量?不同 UDF 间的静态变量间否会互相影响?
A:支持在开发 UDF 时使用静态变量,且不同 UDF 间(即使类同名),静态变量是互相隔离的,不会互相影响。