上一篇我们介绍了4个通用性的转换算子,这篇文章我们介绍下flink的DataSet API模型里面,有哪些专用的转换算子。
一、聚合类的转换算子
1.1、Reduce算子
reduce算子主要是将两个元素合并成为一个元素,或者将一组元素组合合并为单个元素或者多个元素。示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements( new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>( "李四", 19), new Tuple2<String, Integer>("王五", 20), new Tuple2<String, Integer>("王五", 25) ); datasources.groupBy(0).reduce(new RichReduceFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2< String, Integer> reduce(Tuple2< String, Integer> value1, Tuple2< String, Integer> value2) throws Exception { return new Tuple2< String, Integer>(value1.f0,value1.f1+value2.f1); } }).print();
1.2、Aggregate算子
Aggregate算子是将一组值聚合为单个值,可以看做是mysql中的group by操作。Aggregate算子既可以应用于分组的数据集,也可以应用于完整的数据集。可以将Aggregate算子看做是内置的Reduce函数。还有一点就是Aggregate算子只能应用于元组数据类型。同时Aggregate算子主要的聚合功能有:
Sum Min Max
示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple3<Integer,String, Integer>> datasources = environment.fromElements(new Tuple3<Integer,String, Integer>(1, "张三", 18), new Tuple3<Integer,String, Integer>(2, "李四", 19),new Tuple3<Integer,String, Integer>(3, "王五", 20),new Tuple3<Integer,String, Integer>(4, "王五", 25)); datasources.groupBy(1).aggregate(Aggregations.SUM, 2).print()
1.3、Distinct算子
Distinct算子返回数据集中的不同元素,并且从输入数据集中删除掉重复的数据。示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements( new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>( "李四", 19), new Tuple2<String, Integer>("王五", 20), new Tuple2<String, Integer>("王五", 25) ); datasources.distinct(0).print();
二、分区转换算子
2.1、Hash-Partition算子
Hash-Partition算子根据给定的键对数据集进行哈希分区,可以将键指定为位置键,表达式键和键选择器。
2.2、Range-Partition算子
Range-Partition算子根据给定键对数据集进行分区,可以将键指定为位置键,表达式键和键选择器。
2.3、Sort-Partition算子
Sort Partition算子在指定字段上以指定顺序对数据集的所有分区进行本地排序,可以将键指定为位置键和表达式键。并且可以通过SortPartition()方法在对各字段上对分区进行排序
2.4、MapPartition算子
MapPartition算子在单个函数调用中转换并行分区,将分区获取为Iterable,并且可以产生任意数量的结果值。每个分区中元素的数量取决于并行度和先前的算子。示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> datasources = environment.fromElements("this","is","a","test"); datasources.mapPartition(new MapPartitionFunction<String, Integer>() { private static final long serialVersionUID = 1L; @Override public void mapPartition(Iterable<String> values, Collector<Integer> out) throws Exception { int count = 0; for(String value : values) { count++; } out.collect(count); } }).print();
三、排序转换算子
3.1、MinByMaxBy算子
MinByMaxBy算子是从数据集中返回指定字段(或组合)中的最小记录和最大记录。选定的元组可以是一个或多个指定字段的值最小(最大)的元组。在比较字段的时候,必须有效地比较关键字段。如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple3<Integer,String, Integer>> datasources = environment.fromElements( new Tuple3<Integer,String, Integer>(1, "张三", 18), new Tuple3<Integer,String, Integer>(2, "李四", 19), new Tuple3<Integer,String, Integer>(3, "王五", 20), new Tuple3<Integer,String, Integer>(4, "王五", 25) ); datasources.groupBy(1).minBy(0,2).print();
3.2、First-n算子
First-n算子用于返回数据集的前N个任意元素。该算子可以应用于常规数据集、分组数据集和分组排序数据集。可以将分组键指定为键选择器或者字段的位置键。示例代码如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements( new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>( "李四", 19), new Tuple2<String, Integer>("王五", 20), new Tuple2<String, Integer>("王五", 25) ); datasources.first(2).print();
四、关联转换算子
4.1、join算子
join算子先根据指定的条件关联两个数据集,然后根据所选字段形成一个新的数据集。该算子可以将两个数据集合并为一个数据集。如果两个数据集的元素都连接到一个或多个键,则关联的键可以使用以下方式来指定:键表达式、键选择器函数、一个或多个字段的位置键。
4.2、Outer Join算子
Outer Join算子对两个数据集执行左、右、或者完全外连接。外部连接与内部连接很类似。如果在另一侧找不到匹配的键,则保留外侧的所有记录。
4.3、Cross算子
Corss算子可以将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。该算子要么在每对元素上调用用户定义的交叉函数,要么输出元组。
4.4、CoGroup算子
CoGroup算子处理两个数据集的组。该算子把两个数据集都分组在一个定义的键上,把两个共享相同键的数据集组合在一起,交给用户定义的函数。对于一个特定的键,通常使用该组和一个空组调用共同的分组函数。该算子可以分别迭代两个组的元素,并返回任意数量的结果元素。
与Reduce算子、GroupReduce算子和Join算子相似。CoGroup算子可以通过使用不同的键选择函数来定义键。
还没有评论,来说两句吧...