在flink开发中,我们经常也会涉及到多数据源,此时我们需要把多数据源的数据进行join操作,合并到一起,所以本文的话,我们介绍下join函数。
join函数需要根据某个key进行join,两边数据源都要有对应的key,同时key对应的values值需要是相等的,下面我们使用代码来演示下join的操作。完整代码示例如下:
package org.example;
import java.util.Arrays;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
public class JoinFunctionJob {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<Integer, String>> dataSource1 = env.fromCollection(Arrays.asList(
new Tuple2<Integer, String>(1, "张三"),
new Tuple2<Integer, String>(2, "李四"),
new Tuple2<Integer, String>(3, "王五")
));
DataSource<Tuple3<Integer, String,Integer>> dataSource2 = env.fromCollection(Arrays.asList(
new Tuple3<Integer, String,Integer>(5, "数学",90),
new Tuple3<Integer, String,Integer>(2, "英语",98),
new Tuple3<Integer, String,Integer>(4, "语文",80)
));
dataSource1.join(dataSource2).where(tuple -> tuple.f0).equalTo(tuple -> tuple.f0).print();
}
}这里我们可以看到两个数据源有一个id是相同的,所以我们根据id进行join即可,在join的时候我们使用了第一个数据源的f0位置等于第二个数据源的f0位置,最后运行看看结果:
join这块其实只要熟悉sql,用起来还是比较简单,只是在flink中需要转换成where+equalTo的方式来进行等值匹配。最后附上本案例的源码,登录后即可下载。










还没有评论,来说两句吧...