上文《Flink应用开发系列(三十九)Table API & SQL介绍》我们介绍了一下Flink中Table API和Sql的基本概念,本文的话,我们用代码来演示一下Table API开发的相关案例,直接来上代码:
package org.example.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
public class TableApiHelloword {
public static void main(String[] args) throws Exception {
//初始化一个stream流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用table api的时候,我们习惯性创建一个settins,把一些必要的设置都放进去
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
//创建一个table 流执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//这里暂时不用外部的source,我们自己造一个source
DataStream<Tuple2<Integer, String>> stream1 = env.fromCollection(Arrays.asList(new Tuple2<>(1, "hello1"), new Tuple2<>(2, "hello2")));
//将数据类型映射到表模式
Table table = tEnv.fromDataStream(stream1,
Schema.newBuilder().column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING()).build());
//执行从table中查询f0,f1这两个字段
Table result = table.select($("f0"), $("f1"));
//用流的方式把表执行结果打印出来,这里主要是演示,所以才打印,真实的环境中一般会涉及到做其他的etl或者写入sink中。
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}在这个案例里面有几个重要的点:
1、基于流处理,所以需要一个StreamExecutionEnvironment 2、必须要要创建一个table的执行环境。 3、把数据源加载到table的执行环境中。 4、把加载的数据定位为一张表。 5、对定义的表做各种数据库的操作。
然后我们运行看看效果:
当然上面演示的是全查,我们还可以添加各种条件,例如where条件,那么操作table的代码为:
Table result = table.select($("f0"), $("f1")).where($("f0").isEqual(1));然后再看看匹配的result结果:
以上就是关于Table API的开发案例。
备注:
1、这里使用table api操作数据比较抽象成面向对象,几乎所有sql相关的函数,在这个table api里面都有。
最后按照惯例,附上本案例的源码,登录后即可下载。











还没有评论,来说两句吧...