承接上篇《Flink学习系列(三)flink的datasource介绍》。我们介绍了一些常见的flink的datasource,由于像mysql这种没有线程的connector使用,所以我们需要自定义去实现一个mysql的read connector。
1)在maven项目中引入mysql的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.14.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
2)在mysql中执行下面的sql语句。这里我们使用的是mysql5.7
/* Navicat Premium Data Transfer Source Server : 演示数据库 Source Server Type : MySQL Source Server Version : 50738 Source Host : 192.168.31.30:3306 Source Schema : test Target Server Type : MySQL Target Server Version : 50738 File Encoding : 65001 Date: 29/06/2022 10:02:50 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for user -- ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `userage` int(3) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (1, '张三', 15); INSERT INTO `user` VALUES (2, '李四', 12); SET FOREIGN_KEY_CHECKS = 1;
3)在项目中创建一个user的model
package com.big.data.flink.model; import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.ToString; @Data @NoArgsConstructor @AllArgsConstructor @ToString @EqualsAndHashCode @Builder public class UserPoJo implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String username; private Integer age; }
4)编写mysqlsourcefunction
package com.big.data.flink.datastream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import com.big.data.flink.model.UserPoJo; public class MysqlSource extends RichSourceFunction<UserPoJo> { /** * */ private static final long serialVersionUID = 1568115294308904896L; private PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化mysql的连接 Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection( "jdbc:mysql://192.168.31.30:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); // 读取user表作为数据源 String sql = "select * from user;"; // 编写具体逻辑代码 // 使用预编译的sql执行方式 ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); if (connection != null) connection.close(); if (ps != null) ps.close(); } @Override public void run(SourceContext<UserPoJo> ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); // 执行SQL语句返回结果集 while (resultSet.next()) { UserPoJo userOrderCount = new UserPoJo(resultSet.getString("username"), resultSet.getInt("userage")); ctx.collect(userOrderCount); } } @Override public void cancel() { } }
5)编写案例进行测试
package com.big.data.flink.datastream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MysqlReadExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new MysqlSource()).print(); env.execute("mysql read example"); } }
6)执行MysqlReadExample,查看执行结果
以上可以看到顺利的从mysql中读取到了user表的数据。验证成功。
还没有评论,来说两句吧...