上一篇文章《Flink应用开发系列(五十三)Table SQL开发之自定义UDF函数》我们介绍了UDF函数的开发,本文的话我们再介绍下UDAF函数的开发。
在UDF里面,它是给定一个值,然后会返回一个值。但是在UDAF里面,他其实是给定的一列值,返回一个值,例如我们常用的sum函数,也就是求和,他是把符合条件的某一列的值都相加起来然后返回最后计算好的结果值。所以千万记着,UDAF函数是给定一列值,返回1个值。下面我们使用UDAF函数来开发一个求和的案例。
1)创建一个SumUDAF类
这里的话,我们创建一个SumUDAF类,然后继承自AggregateFunction类,具体示例代码如下:
package org.example.udf; import org.apache.flink.table.functions.AggregateFunction; /** * 在使用udf进行多对一输出的时候,例如这种求和的场景,我们需要借助外部的变量存储数据,这里我们使用的SumStore这个类来记录临时变量 */ public class SumUDAF extends AggregateFunction<Long,SumStore> { @Override public Long getValue(SumStore sum) { return sum.getSum(); } @Override public SumStore createAccumulator() { return new SumStore(); } public void accumulate(SumStore a,Long b){ a.setSum(a.getSum()+b); } }
这里的泛型里面有两个类型,第一个泛型是记录出入类型,第二个泛型是记录输出类型最终根据getValue的值来显示结果。
2)打包jar,上传到服务器上
这里我们还是把整个项目打成jar包,然后上传到服务器上,目录仍然是/mnt,示例图如下:
3)执行下面的sql
同样的这里的数据源我们还是采用上一篇文章建立的users表,所以下面直接进入到flink的sql-client客户端,执行下面的sql即可:
#把jar包添加到sql执行环境中去 add jar '/mnt/Flink-App-1.0-SNAPSHOT.jar'; #创建一张users的表 CREATE TABLE users( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.31.218:33306/test', 'username' = 'root', 'password' = '123456', 'table-name' = 'users' ); #注册UDAF函数 CREATE FUNCTION sumFunc as 'org.example.udf.SumUDAF'; #使用UDAF函数查询结果 select sumFunc(id) from users;
执行完毕之后,我们可以看到对应的结果:
查询结果是:
以上就是我们演示UDAF函数的案例,千万记得,UDAF是给定一列值(多个),返回一个值。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...