在flink开发中,我们常常会自定义函数方法或函数类进行数据转换操作。这篇文章就给大家介绍下flink自定义函数的方式有哪些?
一、使用flink提供的接口来实现自定义功能
在flink中有很多接口,我们只需要实现自定义的接口即可,然后在算子转换的时候直接new就可以了。例如:
package com.flink.demo.function; import org.apache.flink.api.common.functions.MapFunction; public class MyMapFunction implements MapFunction<String, Integer>{ /** * */ private static final long serialVersionUID = -1815134697748675421L; @Override public Integer map(String value) throws Exception { return Integer.valueOf(value); } }
这里我们定义的是一个mapfunction,实现mapfunction的接口即可。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources()); source.map(new MyMapFunction()).print();
在使用的时候直接new即可。
二、使用匿名内部类
匿名内部类其实和java原始的匿名内部类是一样的,不需要单独新创建class.java文件,直接在代码里面一气呵成。示例如下:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources()); source.map(new MapFunction<String, Integer>() { /** * */ private static final long serialVersionUID = -6123986789414990470L; @Override public Integer map(String value) throws Exception { return Integer.valueOf(value); } }).print();
三、使用java8的lambda表达式
这种方式也是和上面匿名内部类差不多,只是把内部类的代码替换成了lambda表达式
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources()); source.map(s -> Integer.valueOf(s)).print();
四、使用flink提供的富函数
这里的富函数就是flink里面哪些带有Rich开头的抽象类,我们在编写代码的时候,只需要集成自这些类即可。这种方式和第一种实现接口差不多,但是富函数更能体现出来对应方法的生命周期,代码简单清晰明了。示例代码如下:
package com.flink.demo.function; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; public class MyRichMapFuction extends RichMapFunction<String, Integer>{ /** * */ private static final long serialVersionUID = -6994425965553553701L; @Override public void setRuntimeContext(RuntimeContext t) { // TODO Auto-generated method stub super.setRuntimeContext(t); } @Override public RuntimeContext getRuntimeContext() { // TODO Auto-generated method stub return super.getRuntimeContext(); } @Override public IterationRuntimeContext getIterationRuntimeContext() { // TODO Auto-generated method stub return super.getIterationRuntimeContext(); } @Override public void open(Configuration parameters) throws Exception { // TODO Auto-generated method stub super.open(parameters); } @Override public void close() throws Exception { // TODO Auto-generated method stub super.close(); } @Override public Integer map(String value) throws Exception { return Integer.valueOf(value); } }
在使用的时候直接new即可。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources()); source.map(new MyRichMapFuction ()).print()
最后个人比较推荐使用java8的lambda表达式,如果比较复杂一点的,推荐使用富函数。
还没有评论,来说两句吧...