在上一篇文章《Flink应用开发系列(五十二)Table SQL开发之create function》我们介绍了创建function,在文章里面我们提过这里的函数主要是UDF,UDAF,UDTF等函数,所以本文的话,我们来演示下自定义UDF函数,并且演示整个实现过程。下面直接开始:
1)创建CommaFunction类
这里的话,我们演示下在查询出来的结果上添加一个111的字符串标识,所以这里创建一个CommaFunction的类,这个类需要继承自ScalarFunction这个类,完整代码示例如下:
package org.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class CommaFunction extends ScalarFunction { public String eval(String s) { return s + "111"; } }
继承自ScalarFunction之后,这里需要覆盖的方法里面其实是没有eval这个方法的,如下图:
所以这里的eval方法是需要我们手写的,里面涉及到的参数也是我们自定义,我们想要多少参数就可以定义多少参数,在使用的时候把对应的参数个数添加进去即可。
2)把项目打包上传服务器
这里udf函数很简单,就是输入一个数,然后输出一个数,这里我们代码写完了,就可以把整个项目打包成jar了,然后上传到服务器上,这里我们打完包的截图如下:
然后把这个jar包上传到服务器上去,路径的话可以随便存放,例如这里我们是上传到/mnt目录下的:
3)进入到sql终端
这里我们直接进入到flink的bin目录下,执行sql-client.sh这个文件,进入到sql终端
4)把jar包添加到sql执行环境中去
这里的话我们把jar包使用add jar的语法添加到执行环境中去,对应的命令如下:
add jar '/mnt/Flink-App-1.0-SNAPSHOT.jar';
执行完毕之后,我们的jar就被成功的添加进了sql执行环境了。
5)准备一个mysql
这里演示的话,我们就不用datagen了,直接使用外部的数据源,所以这里我们找一个mysql,创建一张users的表,然后放几条数据进去,详细的示例图如下:
6)Flink sql中创建表
这里我们继续回到flink-sql的执行环境中,创建一张这个users的外部表,示例sql如下:
CREATE TABLE users( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.31.218:33306/test', 'username' = 'root', 'password' = '123456', 'table-name' = 'users' );
创建完成后,可以看到执行成功。
7)注册udf function
此时我们准备在users表中查询,那么就需要把写的udf函数的类注册成一个function,示例命令如下:
CREATE FUNCTION conFunc as 'org.example.udf.CommaFunction';
这里的语法很简单,functionname我们可以随便填写,最后这个是我们写的udf的类的全路径。执行成功之后如下图:
8)使用udf函数进行查询
上面的铺垫工作都已经做完了,下面就可以使用对应的sql进行查询了,示例sql如下:
SELECT id,conFunc(name) as name from users;
执行这条sql之后,我们就可以看到名称后面自动添加了111这个字符串,示例图如下:
以上就是我们实现一个自定义的UDF函数的全过程。
最后按照惯例,附上本案例的源码,登录后即可下载。
发表评论