承接上篇《Flink学习系列(四)flink的datasource之自定义mysql的connector》。我们在实际的生产业务中,有时候我们读取的数据源是没有现成的connector连接器的,那么我们又需要读取到对应的数据源,那这个时候我们怎么办呢?那其实就是自定义的去实现一个connector。
常用的比较简单的实现connector的方式主要是编写一个类,集成自RichSourceFunction 这个类即可。像我们上一篇自定义实现了mysql的connector。
package com.big.data.flink.datastream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.big.data.flink.model.UserPoJo;
public class MysqlSource extends RichSourceFunction<UserPoJo> {
/**
*
*/
private static final long serialVersionUID = 1568115294308904896L;
private PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化mysql的连接
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(
"jdbc:mysql://192.168.31.30:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
// 读取user表作为数据源
String sql = "select * from user;"; // 编写具体逻辑代码
// 使用预编译的sql执行方式
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null)
connection.close();
if (ps != null)
ps.close();
}
@Override
public void run(SourceContext<UserPoJo> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery(); // 执行SQL语句返回结果集
while (resultSet.next()) {
UserPoJo userOrderCount = new UserPoJo(resultSet.getString("username"), resultSet.getInt("userage"));
ctx.collect(userOrderCount);
}
}
@Override
public void cancel() {
}
}这里我们挨个讲解下这里面需要从写实现的方法。
1、继承自RichSourceFunction类后,我们至少需要重写这四个方法(open,run,cancel,close)的实现代码。
2、open方法顾名思义就是刚进入这个类的实现,这里我们一般主要做一些初始化的工作,例如初始化连接信息,初始化集合,队列等信息。
3、run方法就是核心,因为初始化之后,我们就要去拿取数据,像mysql就要去进行查询,像消息队列的话,我们就要开启监听,并且把数据收集起来。
4、cancel方法,即我们取消任务的时候需要做哪些业务逻辑,常见的主要是把哪些还未进行消费处理的数据写入某些存储里面。这样避免数据丢书了。
5、close方法,即关闭整个job的时候要做什么样的业务处理,一般都是关闭连接、清除数据等操作。
结合上面的信息,我们是不是可以很容易的去实现例如:redis的读取,消息队列的读取,nosql的读取等业务操作。









还没有评论,来说两句吧...