问题背景

一开始编写了一个udf函数:

public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(List array, List targetArray) {if (CollectionUtils.isEmpty(array)) {return NOT_EXIST;}for (Object target : targetArray) {if (array.contains(target)) {return EXIST;}}return NOT_EXIST;}

功能其实很简单:判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1

之前的参数类型都是List

然后新的需求来了:需要传入的参数类型是List

显然,我们不能新建一个udf来处理List的情况,但是如果我们简单改写为:

public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(List array, List targetArray) {if (CollectionUtils.isEmpty(array)) {return NOT_EXIST;}for (Object target : targetArray) {if (array.contains(target)) {return EXIST;}}return NOT_EXIST;}

会报错:

Could not extract a data type from ‘java.util.List’ in parameter 0 of method ‘eval’ in class ‘dp.udf.ArrayContains’. Please pass the required data type manually or allow RAW types

Cannot extract a data type from a pure ‘java.lang.Object’ class. Usually, this indicates that class information is missing or got lost. Please specify a more concrete class or treat it as a RAW type.

所以就要用到flink的自动类型推导,具体来说,有时我们希望一种求值方法可以同时处理多种数据类型,有时又要求对重载的多个求值方法仅声明一次通用的结果类型,就可以用@FunctionHint注解来提供从入参数据类型到结果数据类型的映射。

(详细具体可以查看:自定义函数 | Apache Flink)

解决思路

具体udf修改如下:

import org.apache.commons.collections.CollectionUtils;import org.apache.flink.table.annotation.DataTypeHint;import org.apache.flink.table.annotation.FunctionHint;import org.apache.flink.table.functions.ScalarFunction;import org.junit.Test;import java.util.Arrays;import java.util.List;import static org.junit.Assert.assertEquals;/** * 判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1 * flink1.13版本还没有ARRAY_CONTAINS内置函数 */@FunctionHint(input = {@DataTypeHint("ARRAY"), @DataTypeHint("ARRAY")}, output = @DataTypeHint("Int"))@FunctionHint(input = {@DataTypeHint("ARRAY"), @DataTypeHint("ARRAY")}, output = @DataTypeHint("Int"))public class ArrayContains extends ScalarFunction {private static final int EXIST = 1;private static final int NOT_EXIST = -1;// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中public static int eval(Object[] array, Object[] targetArray) {if (array == null) {return NOT_EXIST;}List arrayList = Arrays.asList(array);if (CollectionUtils.isEmpty(arrayList)) {return NOT_EXIST;}for (Object target : targetArray) {if (arrayList.contains(target)) {return EXIST;}}return NOT_EXIST;}

其实就是增加了类上面的FunctionHint声明

验证

写了个几个测试用例,全部通过:

@Testpublic void test() {assertEquals(eval(null, new Integer[]{101, 1}), NOT_EXIST);assertEquals(eval(new Integer[]{}, new Integer[]{101, 1}), NOT_EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{101, 1}), EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{1, 2}), EXIST);assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{3}), NOT_EXIST);assertEquals(eval(new Integer[]{2}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99, 101}), EXIST);assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99}), NOT_EXIST);assertEquals(eval(null, new String[]{"1", "101"}), NOT_EXIST);assertEquals(eval(new String[]{}, new String[]{"1", "101"}), NOT_EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "101"}), EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "2"}), EXIST);assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"3"}), NOT_EXIST);assertEquals(eval(new String[]{"2"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"101", "99"}), EXIST);assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"99"}), NOT_EXIST);}
Copyright © maxssl.com 版权所有 浙ICP备2022011180号