在flink开发中,我们经常也会涉及到多数据源,此时我们需要把多数据源的数据进行join操作,合并到一起,所以本文的话,我们介绍下join函数。
join函数需要根据某个key进行join,两边数据源都要有对应的key,同时key对应的values值需要是相等的,下面我们使用代码来演示下join的操作。完整代码示例如下:
package org.example; import java.util.Arrays; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; public class JoinFunctionJob { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<Integer, String>> dataSource1 = env.fromCollection(Arrays.asList( new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "王五") )); DataSource<Tuple3<Integer, String,Integer>> dataSource2 = env.fromCollection(Arrays.asList( new Tuple3<Integer, String,Integer>(5, "数学",90), new Tuple3<Integer, String,Integer>(2, "英语",98), new Tuple3<Integer, String,Integer>(4, "语文",80) )); dataSource1.join(dataSource2).where(tuple -> tuple.f0).equalTo(tuple -> tuple.f0).print(); } }
这里我们可以看到两个数据源有一个id是相同的,所以我们根据id进行join即可,在join的时候我们使用了第一个数据源的f0位置等于第二个数据源的f0位置,最后运行看看结果:
join这块其实只要熟悉sql,用起来还是比较简单,只是在flink中需要转换成where+equalTo的方式来进行等值匹配。最后附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...