在flink的开发中,我们经常还会涉及到这样一个函数,就是union函数,他主要是把两个数据源合并到1个新的数据源里面去,如果这两个数据源里面有相同的数据,合并后的数据就会出现两份,下面我们用案例来演示下,完整代码示例如下:
package org.example; import java.util.Arrays; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.tuple.Tuple2; public class UnionFuncJob { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<Integer, String>> dataSource1 = env.fromCollection(Arrays.asList(new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "王五"))); DataSource<Tuple2<Integer, String>> dataSource2 = env.fromCollection(Arrays.asList(new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "王五"), new Tuple2<Integer, String>(4, "赵六"))); // DataSource<Tuple2<String, Integer>> dataSource3 = // env.fromCollection(Arrays.asList(new Tuple2<String, Integer>("张三", 1))); UnionOperator<Tuple2<Integer, String>> newSources = dataSource1.union(dataSource2); //.union(dataSource3); newSources.print(); } }
最后我们运行下看看结果:
备注:
1、union是把多个数据源合并生成一个新的数据源 2、多个数据源的数据元素可以由相同的,合并后不会进行去重。 3、使用union合并数据的话,需要数据源类型保持一致,在源码中注释的部分是为了体现合并不同的数据源,此时在编译的阶段都编译不过去。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...