在flink的应用程序中我们经常在数据分析领域,例如机器学习或者图形分析的时候,我们会涉及到迭代的算法,那什么是迭代呢?这里我们简单的举个案例:
给一组(10000个)随机数,查找最小的那个数。
这个案例就是我们会涉及到所有的随机数,那么在没有排序的情况下,我们就需要涉及到迭代,也就是每次传入2个参数给某个方法,然后获取出来最小的数,然后拿这个数再和下一个数进行比对,上面的案例,我们给了10000个随机数,那么在不排序的情况下,我们的迭代次数就是9999次。
迭代对于大数据情况下,从数据中提取重要的有意义的信息至关重要,在实际的场景中我们几乎都会使用到。
在Flink中,迭代算子主要分为两种,分别是:BulkIteration和DeltaIteration。本文的话,我们主要介绍BulkIteration,BulkIteration也称为批量迭代。
这里的BulkIteration迭代,主要是给定一个数据源,可以使用常规的算子进行转换,并且我们可以设置最大的迭代次数。下面我们直接使用代码来测试下,代码示例如下:
package org.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.IterativeDataSet; public class DataSetBulkIterationJob { public static void main(String[] args) throws Exception{ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Create initial IterativeDataSet IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000); DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer i) throws Exception { double x = Math.random(); double y = Math.random(); return i + ((x * x + y * y < 1) ? 1 : 0); } }); // Iteratively transform the IterativeDataSet DataSet<Integer> count = initial.closeWith(iteration); count.map(new MapFunction<Integer, Double>() { @Override public Double map(Integer count) throws Exception { return count / (double)10000 * 4; } }).print(); env.execute("DataSetBulkIterationJob cal Pi"); } }
本文的案例是从网上找的示例,主要是体现使用BulkIteration的过程,整个过程如下:
1、给定一个初始的数据集 2、调用调用iterate(int)迭代的DataSet方法,返回一个IterativeDataSet算子,这里的int主要是指定最大迭代次数。 3、构建一个具体的迭代逻辑,即这里的map方法。 4、通过调用closeWith(DataSet)方法,指定将哪个算子转化传递给下一次迭代。 5、最后输出结果。
以上就是使用BulkIteration进行迭代计算的整个过程和案例。最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...