承接上篇《Flink学习系列(三)flink的datasource介绍》。我们介绍了一些常见的flink的datasource,由于像mysql这种没有线程的connector使用,所以我们需要自定义去实现一个mysql的read connector。
1)在maven项目中引入mysql的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.14.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
2)在mysql中执行下面的sql语句。这里我们使用的是mysql5.7
/* Navicat Premium Data Transfer Source Server : 演示数据库 Source Server Type : MySQL Source Server Version : 50738 Source Host : 192.168.31.30:3306 Source Schema : test Target Server Type : MySQL Target Server Version : 50738 File Encoding : 65001 Date: 29/06/2022 10:02:50 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for user -- ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `userage` int(3) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (1, '张三', 15); INSERT INTO `user` VALUES (2, '李四', 12); SET FOREIGN_KEY_CHECKS = 1;
3)在项目中创建一个user的model
package com.big.data.flink.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@EqualsAndHashCode
@Builder
public class UserPoJo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String username;
private Integer age;
}4)编写mysqlsourcefunction
package com.big.data.flink.datastream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.big.data.flink.model.UserPoJo;
public class MysqlSource extends RichSourceFunction<UserPoJo> {
/**
*
*/
private static final long serialVersionUID = 1568115294308904896L;
private PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化mysql的连接
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(
"jdbc:mysql://192.168.31.30:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
// 读取user表作为数据源
String sql = "select * from user;"; // 编写具体逻辑代码
// 使用预编译的sql执行方式
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null)
connection.close();
if (ps != null)
ps.close();
}
@Override
public void run(SourceContext<UserPoJo> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery(); // 执行SQL语句返回结果集
while (resultSet.next()) {
UserPoJo userOrderCount = new UserPoJo(resultSet.getString("username"), resultSet.getInt("userage"));
ctx.collect(userOrderCount);
}
}
@Override
public void cancel() {
}
}5)编写案例进行测试
package com.big.data.flink.datastream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlReadExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MysqlSource()).print();
env.execute("mysql read example");
}
}6)执行MysqlReadExample,查看执行结果
以上可以看到顺利的从mysql中读取到了user表的数据。验证成功。










还没有评论,来说两句吧...