前两篇我们介绍了使用DataSet和DataStream api操作flink的数据流,这篇文章我们介绍下使用Table API 操作flink的无界数据流。
这里的Table API操作flink的无界数据流其实方式和DataStream操作无界数据流差不多,都是一个常驻的任务进程,同时也需要源源不断获取数据来源。但是在Table API操作无界数据流的话,我们可以把他看做是把这些数据做成table的方式,然后使用sql进行查询。
备注:
1、现在的大数据技术发展已经越来越成熟,各种成熟的案例层出不穷。 2、现在的大数据技术我们可以发现一个问题,就是操作数据越来越sql化,所以table api这种模式也越来越受欢迎。 3、table api只有flink专属,在其他类似的大数据技术(例如:storm,spark这些都没有)。但是在spark里面有dataframe的概念,和这个table api形式上差不多。
一、创建一个WordCountTableForStream的类
package com.flink.demo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import com.flink.demo.sources.MySources; /** * 使用flink的table api进行流处理 * * @author Administrator * */ public class WordCountTableForStream { public static void main(String[] args) throws Exception { // 初始化一下flink的执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 由于这里是需要操作table api,同时我们使用的是stream的模式,所以需要配置settings设置为stream的模式 EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); // 初始化streamtable的执行环境 StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings); // 在flink的执行环境里面添加数据源 DataStreamSource<String> source = environment.addSource(new MySources()); // streamtable的执行环境里面,从flink的数据源中提取数据,由于我们这里是wordcount,所以每一个元素是每一个单词,我们起名叫word,这个名称随便起 Table table = tableEnvironment.fromDataStream(source, "word"); //然后我们直接执行sql语句,这里是第一种方式,可以直接在table后面对接方法即可,有where,join,union等操作 Table result = table.where("word = 'API'"); //这是第二种方式,我们可以直接写sql操作这个table即可,这样子也比较方便。 //Table result = tableEnvironment.sqlQuery("select word from " + table); //把查询结果转换成指定的DataStream类型,如果不需要转换成对象的话,直接用Row即可, //tableEnvironment.toAppendStream(result, Row.class).print(); //把查询结果转换成指定的DataStream类型,演示转换成RsPoJo对象 tableEnvironment.toAppendStream(result, RsPoJo.class).print(); //所有的stream都需要起名 environment.execute("flink WordCountTableForStream"); } }
这里所有的解释都写的很明白。但是还是备注下:
1、需要定义类型是stream的方式 2、需要从原有的datasource里面获取数据 3、需要进行查询 4、需要把查询的结果进行类型转换后再进行下一步处理。
二、测试验证
没有任何问题。
特别说明:
1、在使用tableapi的时候,我们的maven依赖的包比较多,需要添加如下的包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.15.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.15.0</version> </dependency>
以上就是使用flink的table api操作无界数据的案例,最后附上源码:点击下载
还没有评论,来说两句吧...