承接上篇《Flink学习系列(六)介绍下flink的sink》。这篇我们自定义的实现一个mysql的sink。这里我们还是使用之前的user表。
1)、创建一个mysqlsink类
package com.big.data.flink.datasource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.big.data.flink.model.UserPoJo;
public class MysqlSink extends RichSinkFunction<UserPoJo> {
private PreparedStatement ps = null;
private Connection connection = null;
/**
*
*/
private static final long serialVersionUID = -7849881919677615684L;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 加载mysql驱动
Class.forName("com.mysql.jdbc.Driver");
// 初始化mysql连接
connection = DriverManager.getConnection("jdbc:mysql://192.168.31.30:3306/test?useSSL=false", "root", "123456");
String sql = "insert into user(username,userage) values (?,?);";
ps = connection.prepareStatement(sql);
}
@Override
public void invoke(UserPoJo value, Context context) throws Exception {
super.invoke(value, context);
ps.setString(1, String.valueOf(value.getUsername()));
ps.setString(2, String.valueOf(value.getAge()));
ps.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if(connection != null)
connection.close();
if(ps != null)
ps.close();
}
}2)、如何使用呢?
package com.big.data.flink.datastream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.big.data.flink.datasource.MysqlSink;
import com.big.data.flink.datasource.MysqlSource;
import com.big.data.flink.model.UserPoJo;
public class MysqlSinkExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserPoJo> sourceStream = env.addSource(new MysqlSource());
sourceStream.addSink(new MysqlSink());
env.execute("mysql sink example");
}
}总结:
1、要实现一个自定义的sink,需要集成自RichSinkFunction这个类。
2、同样我们根据RichSinkFunction这个类的生命周期来重写具体的实现方法
3、需要重写的方法有:open,invoke,close这三个方法。









还没有评论,来说两句吧...