今天这篇文章给大家演示下flink的sql功能,之前我们介绍过,在flink中使用sql一共两种方式:
第一种方式是:通过mysql这样的connector去读取数据库的数据,然后在程序里面使用transform进行数据操作。
第二种方式是:读取外部的数据,然后把数据整合成dataframe的数据结构,使用sql在内存中操作dataframe的数据。
以上两种方式里面,关于第一个mysql的connector,我们在之前的文章里面已经见识过了,详见《Flink学习系列(四)flink的datasource之自定义mysql的connector》。
今天这篇文章我们介绍下第二种方式,就是读取外部的数据,然后整合成dataframe的数据结果,使用sql进行操作。这里我们不搞太复杂,直接从使用文件读取,并且写入文件。不多说直接实战。
一、创建项目,引入maven依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<flink.version>1.9.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!--flink core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>二、准备两个txt文件
一个是:students.txt,一个是:scores.txt
students.txt文件的内容是:
1 张三 1班 2 李四 1班 3 王五 2班 4 赵六 2班 5 郭大宝 2班
scores.txt文件的内容是:
1 100 90 80 2 97 87 74 3 70 50 43 4 100 99 99 5 80 81 82
三、创建一个SQLBatchJob类
package com.flink.demo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SQLBatchJob {
public static void main(String[] args) throws Exception {
// 初始化执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 初始化一个批量处理的数据环境
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
/**
* 这里不演示从消息队列获取数据,直接演示从文件里面获取数据
*/
DataSource<String> s_sudents = env.readTextFile("D:\\workspace\\FlinkSQL\\src\\main\\resources\\students.txt");
DataSource<String> s_score = env.readTextFile("D:\\workspace\\FlinkSQL\\src\\main\\resources\\scores.txt");
/**
* 读取数据回来之后,我们需要把数据进行下切割,在使用sql的时候,我们之前提过,需要把数据转换成dataframe的类型
*/
// student的数据结构是:userid,username,classname
DataSet<Tuple3<Integer, String, String>> students = s_sudents.map(new SplitBlank3Function());
// score的数据结构是:scoreid,chinese,math,english
DataSet<Tuple4<Integer, Integer, Integer, Integer>> scores = s_score.map(new SplitBlank4Function());
// 上面我们已经把数据做成了dataset的数据模型,但是我们在真实使用的时候,需要把这两个数据join起来。这时候,我们需要做关联join
DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(scores)
// 第0个数据进行聚合,第0个位置大家都是id,这里我们在join的时候一定要记得把对应的数据放在相同的位置上,这个可以根据切割把数据从新排序下,这里仅适用于在文件里面进行读取操作,如果是其他字段的话,会带有字段名,就不需要指定字段名称
.where(0)
// 在join里面我们知道有一个 字段 on 字段,这里我们直接指定id相等
.equalTo(0)
// student 有第0,1,2的位置的字段在join后保留
.projectFirst(0, 1, 2)
// score 有第1,2,3的字段在join后保留
.projectSecond(1, 2, 3);
data.print();
// 以上我们已经把两个文件的数据进行了整合,这就是一个完整的dataframe数据结构,这里我们数据注册成sql的表模式
tEnv.registerDataSet("student_score", data, "id,username,classname,chinese,math,english");
// 整合成了dataframe结构之后,又给每个字段做了字段名,这时候我们是不是就可以直接使用sql进行操作了呢?试一试
// 定义sql
String sql = "select * from student_score where id = 1";
Table sqlQuery = tEnv.sqlQuery(sql);
// 执行这个sql
DataSet<Student_ScorePoJo> dataSet = tEnv.toDataSet(sqlQuery, Student_ScorePoJo.class);
// 打印执行结果
log.info("查询id为1的数据结果是:{}", JSON.toJSONString(dataSet.collect().get(0)));
// 然后我们把数据可以写入到文件中
/**
* 这里我们注意下,写入到文件里面的数据是对象的tostring,如果对象里面没有重写tostring方法,则写入到文件里面的就是对象,看不到具体内容
*/
dataSet.writeAsText("D:\\workspace\\FlinkSQL\\src\\main\\resources\\student_scores.txt");
tEnv.execute("SQLBatchJob flink sql demo");
}
}在上面的方法里面,我们看到有使用pojo类和function,这里我们补充下
Student_ScorePoJo.java代码
package com.flink.demo;
import java.io.Serializable;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class Student_ScorePoJo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7549413573936136022L;
private Integer id;
private String username;
private String classname;
private Integer chinese;
private Integer math;
private Integer english;
/**
* 我们不希望默认的tostring,所以直接使用fastjson改造下
*/
@Override
public String toString() {
return JSON.toJSONString(this);
}
}SplitBlank3Function.java内容
package com.flink.demo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
/**
*
* @author Administrator
*
* 注意下这里的类里面带有3,指的是这个类里面我们这里的tuple我们是3个参数
*
*/
public class SplitBlank3Function implements MapFunction<String, Tuple3<Integer, String, String>>{
/**
*
*/
private static final long serialVersionUID = 684351715380827467L;
/**
* 这里的方法主要是把每一行的数据安装空格进行切割,然后输出切割后的数据
*/
@Override
public Tuple3<Integer, String, String> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
}
}SplitBlank4Function.java内如如下
package com.flink.demo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
/**
*
* @author Administrator
*
* 注意下这里的类里面带有4,指的是这个类里面我们这里的tuple我们是4个参数
*
*/
public class SplitBlank4Function implements MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>{
/**
*
*/
private static final long serialVersionUID = 684351715380827467L;
/**
* 这里的方法主要是把每一行的数据安装空格进行切割,然后输出切割后的数据
*/
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
Integer.valueOf(line[2]), Integer.valueOf(line[3]));
}
}以上就是我们实现整个dataframe操作sql的实战案例,每一个方法几乎都有注释,大家可以很直观的看明白。
备注,注意以下几点:
1、在write对象到文件里面,我们需要重写对象的tostring方法。
2、dataset模型里面的参数我们都可以直接使用tuple,里面有几个参数就使用TupleN
3、这里可以看做dataframe里面的数据结构满足sql92的几乎所有标准。在使用的时候多去尝试,在实际中,一般都是 sum,avg,distinct等等。

还没有评论,来说两句吧...