今天这篇文章给大家演示下flink的sql功能,之前我们介绍过,在flink中使用sql一共两种方式:
第一种方式是:通过mysql这样的connector去读取数据库的数据,然后在程序里面使用transform进行数据操作。
第二种方式是:读取外部的数据,然后把数据整合成dataframe的数据结构,使用sql在内存中操作dataframe的数据。
以上两种方式里面,关于第一个mysql的connector,我们在之前的文章里面已经见识过了,详见《Flink学习系列(四)flink的datasource之自定义mysql的connector》。
今天这篇文章我们介绍下第二种方式,就是读取外部的数据,然后整合成dataframe的数据结果,使用sql进行操作。这里我们不搞太复杂,直接从使用文件读取,并且写入文件。不多说直接实战。
一、创建项目,引入maven依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <flink.version>1.9.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!--flink core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--flink-table --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!--kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> </dependencies>
二、准备两个txt文件
一个是:students.txt,一个是:scores.txt
students.txt文件的内容是:
1 张三 1班 2 李四 1班 3 王五 2班 4 赵六 2班 5 郭大宝 2班
scores.txt文件的内容是:
1 100 90 80 2 97 87 74 3 70 50 43 4 100 99 99 5 80 81 82
三、创建一个SQLBatchJob类
package com.flink.demo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; @Slf4j public class SQLBatchJob { public static void main(String[] args) throws Exception { // 初始化执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 初始化一个批量处理的数据环境 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); /** * 这里不演示从消息队列获取数据,直接演示从文件里面获取数据 */ DataSource<String> s_sudents = env.readTextFile("D:\\workspace\\FlinkSQL\\src\\main\\resources\\students.txt"); DataSource<String> s_score = env.readTextFile("D:\\workspace\\FlinkSQL\\src\\main\\resources\\scores.txt"); /** * 读取数据回来之后,我们需要把数据进行下切割,在使用sql的时候,我们之前提过,需要把数据转换成dataframe的类型 */ // student的数据结构是:userid,username,classname DataSet<Tuple3<Integer, String, String>> students = s_sudents.map(new SplitBlank3Function()); // score的数据结构是:scoreid,chinese,math,english DataSet<Tuple4<Integer, Integer, Integer, Integer>> scores = s_score.map(new SplitBlank4Function()); // 上面我们已经把数据做成了dataset的数据模型,但是我们在真实使用的时候,需要把这两个数据join起来。这时候,我们需要做关联join DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(scores) // 第0个数据进行聚合,第0个位置大家都是id,这里我们在join的时候一定要记得把对应的数据放在相同的位置上,这个可以根据切割把数据从新排序下,这里仅适用于在文件里面进行读取操作,如果是其他字段的话,会带有字段名,就不需要指定字段名称 .where(0) // 在join里面我们知道有一个 字段 on 字段,这里我们直接指定id相等 .equalTo(0) // student 有第0,1,2的位置的字段在join后保留 .projectFirst(0, 1, 2) // score 有第1,2,3的字段在join后保留 .projectSecond(1, 2, 3); data.print(); // 以上我们已经把两个文件的数据进行了整合,这就是一个完整的dataframe数据结构,这里我们数据注册成sql的表模式 tEnv.registerDataSet("student_score", data, "id,username,classname,chinese,math,english"); // 整合成了dataframe结构之后,又给每个字段做了字段名,这时候我们是不是就可以直接使用sql进行操作了呢?试一试 // 定义sql String sql = "select * from student_score where id = 1"; Table sqlQuery = tEnv.sqlQuery(sql); // 执行这个sql DataSet<Student_ScorePoJo> dataSet = tEnv.toDataSet(sqlQuery, Student_ScorePoJo.class); // 打印执行结果 log.info("查询id为1的数据结果是:{}", JSON.toJSONString(dataSet.collect().get(0))); // 然后我们把数据可以写入到文件中 /** * 这里我们注意下,写入到文件里面的数据是对象的tostring,如果对象里面没有重写tostring方法,则写入到文件里面的就是对象,看不到具体内容 */ dataSet.writeAsText("D:\\workspace\\FlinkSQL\\src\\main\\resources\\student_scores.txt"); tEnv.execute("SQLBatchJob flink sql demo"); } }
在上面的方法里面,我们看到有使用pojo类和function,这里我们补充下
Student_ScorePoJo.java代码
package com.flink.demo; import java.io.Serializable; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode public class Student_ScorePoJo implements Serializable { /** * */ private static final long serialVersionUID = 7549413573936136022L; private Integer id; private String username; private String classname; private Integer chinese; private Integer math; private Integer english; /** * 我们不希望默认的tostring,所以直接使用fastjson改造下 */ @Override public String toString() { return JSON.toJSONString(this); } }
SplitBlank3Function.java内容
package com.flink.demo; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; /** * * @author Administrator * * 注意下这里的类里面带有3,指的是这个类里面我们这里的tuple我们是3个参数 * */ public class SplitBlank3Function implements MapFunction<String, Tuple3<Integer, String, String>>{ /** * */ private static final long serialVersionUID = 684351715380827467L; /** * 这里的方法主要是把每一行的数据安装空格进行切割,然后输出切割后的数据 */ @Override public Tuple3<Integer, String, String> map(String s) throws Exception { String[] line = s.split(" "); return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]); } }
SplitBlank4Function.java内如如下
package com.flink.demo; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple4; /** * * @author Administrator * * 注意下这里的类里面带有4,指的是这个类里面我们这里的tuple我们是4个参数 * */ public class SplitBlank4Function implements MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>{ /** * */ private static final long serialVersionUID = 684351715380827467L; /** * 这里的方法主要是把每一行的数据安装空格进行切割,然后输出切割后的数据 */ @Override public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception { String[] line = s.split(" "); return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]), Integer.valueOf(line[2]), Integer.valueOf(line[3])); } }
以上就是我们实现整个dataframe操作sql的实战案例,每一个方法几乎都有注释,大家可以很直观的看明白。
备注,注意以下几点:
1、在write对象到文件里面,我们需要重写对象的tostring方法。
2、dataset模型里面的参数我们都可以直接使用tuple,里面有几个参数就使用TupleN
3、这里可以看做dataframe里面的数据结构满足sql92的几乎所有标准。在使用的时候多去尝试,在实际中,一般都是 sum,avg,distinct等等。
还没有评论,来说两句吧...