上文《Flink应用开发系列(九)DataSet开发之迭代之BulkIteration》我们介绍了BulkIteration迭代,本文的话,我们介绍下另外一个迭代,也就是DeltaIteration迭代,简称:Delta迭代。
这里的主要是利用某些算法在每次迭代中不会更改解决方案的每个数据点的事实。这个迭代在我的日常工作中使用比较少,所以主要是按照文档给大家介绍,不多说,还是直接上代码:
// read the initial data sets DataSet<Tuple2<Long, Double>> initialSolutionSet = // [...] DataSet<Tuple2<Long, Double>> initialDeltaSet = // [...] int maxIterations = 100; int keyPosition = 0; DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet .iterateDelta(initialDeltaSet, maxIterations, keyPosition); DataSet<Tuple2<Long, Double>> candidateUpdates = iteration.getWorkset() .groupBy(1) .reduceGroup(new ComputeCandidateChanges()); DataSet<Tuple2<Long, Double>> deltas = candidateUpdates .join(iteration.getSolutionSet()) .where(0) .equalTo(0) .with(new CompareChangesToCurrent()); DataSet<Tuple2<Long, Double>> nextWorkset = deltas .filter(new FilterByThreshold()); iteration.closeWith(deltas, nextWorkset) .writeAsCsv(outputPath);
整个Delta迭代的过程如下:
1、给定两个基本的数据集,一般我们称为:工作集合解决方案集。 2、设置对打的跌打次数和关键位置。 3、调用iterateDelta(DataSet, int, int)方法,这是迭代的初始数据集,最大迭代次数和关键位置。构建迭代器。 4、使用迭代器的方法构建访问工作集的DeltaIteration迭代对象。 5、使用工作集结果,通过iteration.getSolutionSet()方法构建解决方案集。 6、最后使用iteration.closeWith输出结果。
以上就是关于Flink中的DeltaIteration迭代。
还没有评论,来说两句吧...