在flink的开发中,还有一个比较常用的函数就是reduce函数,这个reduce函数是一个聚合函数,他就是mapreduce的思想,map是拆分转换,reduce就是聚合转换。
在使用reduce的时候,接收到的key是被分组一起传递过来的,因此这里相同的key的数据集被分组到了一起,只是value值不一样,此时在这个分组里面直接进行reduce的操作,下面我们来演示下:
背景:
给一批数据,分别记录下这批数据出现的次数
下面我们使用代码来演示下,完整代码如下:
package org.example; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class ReduceFunctionJob { public static void main(String[] args) throws Exception{ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> dataSource = env.fromElements("张三","李四","王五","赵六","李四","王五","赵六", "张三","李四","王五","李四","王五","赵六","李四" ); dataSource.map(new RichMapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<String,Integer>(value,1); } }).groupBy(value -> value.f0).reduce(new RichReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0,value1.f1+value2.f1); } }).print(); } }
我们来介绍下上诉的过程:
1、给定一批数据源,这里给的是名字。 2、使用map函数,名字出现一次就记录下1,形成(名字,1)这样的数据 3、根据名字进行分组 4、使用reduce函数,把记录相加
最后我们运行下看看结果:
以上就是reduce函数的用法,最后按照惯例,附上本案例的源码,登录后即可下载:
还没有评论,来说两句吧...