前一篇文章《Flink应用开发系列(四十)Table API案例》我们介绍了table api相关的案例,这里的话我们使用flink自带的datagen工具来演示下table sql的案例,下面直接上代码:
package org.example.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.Arrays; import static org.apache.flink.table.api.Expressions.$; public class TableApiHelloword { public static void main(String[] args) throws Exception { //初始化一个stream流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //使用table api的时候,我们习惯性创建一个settins,把一些必要的设置都放进去 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); //创建一个table 流执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); //使用sql的方式创建数据源,这里的数据源需要连接connector,我们这里为了演示,使用的是flink自带的datagen tEnv.executeSql("CREATE TABLE tmp_orders( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'number-of-rows' = '50' );"); //从表中查询数据出来 tEnv.sqlQuery("SELECT * FROM tmp_orders").execute().print(); env.execute(); } }
从上诉代码中我们可以看到相比table api来说,table sql的话在开发起来更简单,我们只需要执行sql语句即可,而且没有其他的一些冗余代码,看起来非常简洁。下面我们运行起来看看:
可以看到能查询出来很多的数据,这些数据都是datagen工具生成的。
以上极速关于table sql相关的案例介绍。
备注:
1、这里不管是table sql还是table api,都是基于table来操作的,所以创建流环境、创建table执行环境都必不可少。
2、在实际的开发中table api用的会比较少,table sql会用的比较多,毕竟只需要写sql嘛。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...