在flink中,当我们定义了窗口分配器智慧,我们就需要在每个窗口中进行计算操作。这里的计算操作就需要用到窗口函数。在flink的窗口处理事件里面还是还是挺多的,例如:ReduceFunction、AggregateFunction、FoldFunction、ProcessWindowFunction等。下面我们主要介绍下这4个树妖的函数。
一、ReduceFunction
这个函数其实和reduce的操作是一样的,也就是把多个结果合并成一个结果,类似于UDF函数。代码示例如下:
package com.flink.window; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ReduceFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> source = environment.fromElements( new Tuple2<String, Integer>("葡萄", 1), new Tuple2<String, Integer>("猕猴桃", 2), new Tuple2<String, Integer>("树莓", 3), new Tuple2<String, Integer>("哈密瓜", 4), new Tuple2<String, Integer>("哈密瓜", 6)); //使用reduceFunction source.keyBy(0).countWindow(2).reduce(new ReduceFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } }).print(); environment.execute(); } }
二、AggregateFunction
AggregateFunction是ReduceFunction的通用版本。与ReduceFunction()相同,Flink将在窗口输入元素达到是对其进行增量聚合。代码示例如下:
package com.flink.window; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.flink.demo.function.MyAverageAggregate; public class WindowFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> source = environment.fromElements( new Tuple2<String, Integer>("葡萄", 1), new Tuple2<String, Integer>("猕猴桃", 2), new Tuple2<String, Integer>("树莓", 3), new Tuple2<String, Integer>("哈密瓜", 4), new Tuple2<String, Integer>("哈密瓜", 6)); // 使用reduceFunction source.keyBy(0).countWindow(2).aggregate(new MyAverageAggregate()).print(); environment.execute(); } }
MyAverageAggregate类
package com.flink.demo.function; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; public class MyAverageAggregate implements AggregateFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, Double> { /** * */ private static final long serialVersionUID = 7121387264369233012L; @Override public Tuple2<Integer, Integer> createAccumulator() { return new Tuple2<Integer, Integer>(0, 0); } @Override public Tuple2<Integer, Integer> add(Tuple2<String, Integer> value, Tuple2<Integer, Integer> accumulator) { return new Tuple2<Integer, Integer>(accumulator.f0 + value.f1, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return (double) (accumulator.f0 / accumulator.f1); } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return new Tuple2<Integer, Integer>(a.f0 + b.f0, a.f1 + b.f1); } }
三、FoldFunction
FoldFunction通过对初始累加器元素应用二进制运算将组元素中的每个元素组合到单个值中。此函数目前在1.3版本之后被移除掉了,已经不能再使用了,如果需要使用的话,直接使用reduce即可。
四、ProcesWindowFunction
ProcesWindowFunction获得一个Iterable,其中包含窗口的所有元素,以及一个上下文对象。该功能以性能变低和资源消耗为代价,因为他不能增量聚合元素,而是在内部对聚合元素进行缓存。直到将窗口视为已准备好进行处理为止。所以,将ProcesWindowFunction用于简单聚合(例如count)的效率很低。这个现在也废弃了。不在此做过多介绍
还没有评论,来说两句吧...