在前面《Flink应用开发系列(十三)数据集转换之MapFunction》我们介绍了mapfunction,本文的话我们再介绍下这个mappartitionFunction。这里的mappartition和map是差不多的,总体的数据转换是1对1,但是我们使用mappartition有什么用呢?这里我们着重说一下:
如果是普通的map,比如一个partition中有1万条数据。ok,那么你的function要执行和计算1万次。使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。
所以在实际的生产环境中,我们使用mappartition的案例会比较多一点,下面我们使用mappartition来演示一下,完整代码如下:
package org.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.util.Collector; public class DataSetMapPartitionFuncJob { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> input = env.fromElements(2, 1); input.mapPartition(new MapPartitionFunction<Integer, String>() { @Override public void mapPartition(Iterable<Integer> values, Collector<String> out) throws Exception { for(Integer value : values) { out.collect("userId:" + value); } } }).print(); } }
然后我们运行下看看结果:
可以看到实现的效果和map是一模一样的。
注意事项:
1、这里的mappartition虽然提高了效率和性能,但是增加了内存的消耗,在生产环节的时候需要注意下这个问题。
最后按照惯例,附上本案例的源码,登录后即可下载:
还没有评论,来说两句吧...