承接上篇《Flink学习系列(四)flink的datasource之自定义mysql的connector》。我们在实际的生产业务中,有时候我们读取的数据源是没有现成的connector连接器的,那么我们又需要读取到对应的数据源,那这个时候我们怎么办呢?那其实就是自定义的去实现一个connector。
常用的比较简单的实现connector的方式主要是编写一个类,集成自RichSourceFunction 这个类即可。像我们上一篇自定义实现了mysql的connector。
package com.big.data.flink.datastream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import com.big.data.flink.model.UserPoJo; public class MysqlSource extends RichSourceFunction<UserPoJo> { /** * */ private static final long serialVersionUID = 1568115294308904896L; private PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化mysql的连接 Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection( "jdbc:mysql://192.168.31.30:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); // 读取user表作为数据源 String sql = "select * from user;"; // 编写具体逻辑代码 // 使用预编译的sql执行方式 ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); if (connection != null) connection.close(); if (ps != null) ps.close(); } @Override public void run(SourceContext<UserPoJo> ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); // 执行SQL语句返回结果集 while (resultSet.next()) { UserPoJo userOrderCount = new UserPoJo(resultSet.getString("username"), resultSet.getInt("userage")); ctx.collect(userOrderCount); } } @Override public void cancel() { } }
这里我们挨个讲解下这里面需要从写实现的方法。
1、继承自RichSourceFunction类后,我们至少需要重写这四个方法(open,run,cancel,close)的实现代码。
2、open方法顾名思义就是刚进入这个类的实现,这里我们一般主要做一些初始化的工作,例如初始化连接信息,初始化集合,队列等信息。
3、run方法就是核心,因为初始化之后,我们就要去拿取数据,像mysql就要去进行查询,像消息队列的话,我们就要开启监听,并且把数据收集起来。
4、cancel方法,即我们取消任务的时候需要做哪些业务逻辑,常见的主要是把哪些还未进行消费处理的数据写入某些存储里面。这样避免数据丢书了。
5、close方法,即关闭整个job的时候要做什么样的业务处理,一般都是关闭连接、清除数据等操作。
结合上面的信息,我们是不是可以很容易的去实现例如:redis的读取,消息队列的读取,nosql的读取等业务操作。
还没有评论,来说两句吧...