上一篇篇文章我们介绍下使用Table API 操作flink的无界数据流。这篇我们介绍下使用Table API操作flink的有界数据流。
这里的Table API操作flink的有界数据流其实方式和DataSet操作有界数据流差不多,都是读取一批数据后,进行对应的业务逻辑处理,这里也是一样的,把数据做成table的方式,然后通过sql语句进行操作。原理很简单,我们就不多说了。
备注:在这篇文章里面,我们的maven依赖有一些变化,大家可以直接下载源码进行查看。
一、创建TableAPIForBatch类
package com.flink.demo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import com.flink.demo.model.UserPoJo; public class TableAPIForBatch { public static void main(String[] args) throws Exception { // 初始化一个flink的执行环境 ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); // 初始化一个table batch的执行环境 BatchTableEnvironment tEnv = BatchTableEnvironment.create(environment); // 定义flink的数据源 DataSource<UserPoJo> input = environment.fromElements(new UserPoJo("zhangsan", 15), new UserPoJo("lisi", 16), new UserPoJo("wangwu", 17)); // 把数据源转换成table Table table = tEnv.fromDataSet(input); // 执行sql的wehere条件 Table filter = table.where("age = 16"); // 筛选table的结果 DataSet<UserPoJo> result = tEnv.toDataSet(filter, UserPoJo.class); // 对结果进行打印 result.print(); } }
这里就是我们之前的流程:
1、读取一批数据进来
2、转换成table的格式
3、使用sql操作数据
4、打印输出结果
二、测试验证
完全没问题,最后附上代码:点击下载
还没有评论,来说两句吧...