上一篇篇文章我们介绍下使用Table API 操作flink的无界数据流。这篇我们介绍下使用Table API操作flink的有界数据流。
这里的Table API操作flink的有界数据流其实方式和DataSet操作有界数据流差不多,都是读取一批数据后,进行对应的业务逻辑处理,这里也是一样的,把数据做成table的方式,然后通过sql语句进行操作。原理很简单,我们就不多说了。
备注:在这篇文章里面,我们的maven依赖有一些变化,大家可以直接下载源码进行查看。
一、创建TableAPIForBatch类
package com.flink.demo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import com.flink.demo.model.UserPoJo;
public class TableAPIForBatch {
public static void main(String[] args) throws Exception {
// 初始化一个flink的执行环境
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
// 初始化一个table batch的执行环境
BatchTableEnvironment tEnv = BatchTableEnvironment.create(environment);
// 定义flink的数据源
DataSource<UserPoJo> input = environment.fromElements(new UserPoJo("zhangsan", 15), new UserPoJo("lisi", 16),
new UserPoJo("wangwu", 17));
// 把数据源转换成table
Table table = tEnv.fromDataSet(input);
// 执行sql的wehere条件
Table filter = table.where("age = 16");
// 筛选table的结果
DataSet<UserPoJo> result = tEnv.toDataSet(filter, UserPoJo.class);
// 对结果进行打印
result.print();
}
}这里就是我们之前的流程:
1、读取一批数据进来
2、转换成table的格式
3、使用sql操作数据
4、打印输出结果
二、测试验证
完全没问题,最后附上代码:点击下载










还没有评论,来说两句吧...