承接上篇《Flink学习系列(六)介绍下flink的sink》。这篇我们自定义的实现一个mysql的sink。这里我们还是使用之前的user表。
1)、创建一个mysqlsink类
package com.big.data.flink.datasource; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.big.data.flink.model.UserPoJo; public class MysqlSink extends RichSinkFunction<UserPoJo> { private PreparedStatement ps = null; private Connection connection = null; /** * */ private static final long serialVersionUID = -7849881919677615684L; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 加载mysql驱动 Class.forName("com.mysql.jdbc.Driver"); // 初始化mysql连接 connection = DriverManager.getConnection("jdbc:mysql://192.168.31.30:3306/test?useSSL=false", "root", "123456"); String sql = "insert into user(username,userage) values (?,?);"; ps = connection.prepareStatement(sql); } @Override public void invoke(UserPoJo value, Context context) throws Exception { super.invoke(value, context); ps.setString(1, String.valueOf(value.getUsername())); ps.setString(2, String.valueOf(value.getAge())); ps.executeUpdate(); } @Override public void close() throws Exception { super.close(); if(connection != null) connection.close(); if(ps != null) ps.close(); } }
2)、如何使用呢?
package com.big.data.flink.datastream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.big.data.flink.datasource.MysqlSink; import com.big.data.flink.datasource.MysqlSource; import com.big.data.flink.model.UserPoJo; public class MysqlSinkExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserPoJo> sourceStream = env.addSource(new MysqlSource()); sourceStream.addSink(new MysqlSink()); env.execute("mysql sink example"); } }
总结:
1、要实现一个自定义的sink,需要集成自RichSinkFunction这个类。
2、同样我们根据RichSinkFunction这个类的生命周期来重写具体的实现方法
3、需要重写的方法有:open,invoke,close这三个方法。
还没有评论,来说两句吧...