在前面的文章我们主要介绍了flink多数据源的时间窗口,然后对数据进行计算,但是试想一下,真实的生产环境中我们经常多数据源都是乱序的,所以此时我们多个数据源拿过来进行join的时候,由于数据处理的延迟,会导致如下情况:
同时有A和B两批数据发进来,时间窗口设置为1分组,按照推算在10:10-10:11分之间A和B分别都会有10条数据进来,但是由于乱序的原因,本来B的提前发了其他数据, 导致本来B应该在10:10分到10:11分发送10条数据的,但是真实的情况确实B在10:10分到10:11分只发了8条数据,剩下两条数据在10:11分到10:12分才发过来。
此时按照常规的窗口计算的话,本来应该处理10条数据,结果只处理了8条数据,剩下两条数据在后面的时间过来的,结果前面窗口的数据已经被销毁了,导致数据就join不起来,最终的结果也就不准确了。所以这时候怎么办呢?这就是这里提到的水位线。
在flink中,水位线的意思是:
在事件时间的语义下,不依赖系统时间,而是基于数据自带的时间戳去定义一个时钟,用来表示当前时间的进展。 在数据流中加入一个时钟标记,记录当前的事件时间,这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了,这种类似于水流中用来做标志的记号,在Flink中被称为水位线
水位线的效果就是:
假设场景,比如说上学校车,校车每天早上九点发车,但有一部分学生可能九点零二才能赶来,于是小明偷偷把司机的时间调后了两秒,这样当时间到了九点(延后两秒的九点)大家都能上车了。
所以按照我的理解就是原本时间窗口是10点10分到10点11分,这边利用水位线的话,就相当于给时间种调慢一点,原本到10点11分进行一次结算,如果设置了水位线为5秒,则真实的情况是在10点11分5秒的时候进行一次结算,这样子就尽量整合完所有的数据。
下面我们还是按照惯例来演示下,还是使用前面的案例
一、定义学生的水位线接收stream
package com.flink.demo.connector; import java.time.Duration; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.alibaba.fastjson.JSONObject; import com.flink.demo.utils.TransformUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; public class UserDataWithWaterStream { public static DataStream<JSONObject> getUserDataStream(StreamExecutionEnvironment env) { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307) .username("root").password("123456").databaseList("users").tableList("users.user") .startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("Asia/Shanghai").build(); // 2.使用CDC Source从MySQL读取数据 DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce, WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)) .withTimestampAssigner(new SerializableTimestampAssigner<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp(String extractData, long l) { return JSONObject.parseObject(extractData).getLong("ts_ms"); } }), "UserDataStreamNoWatermark Source"); // 3.转换为指定格式 DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> { return TransformUtil.formatResult(rawData); }); return teacherDataStream; } }
二、定义学校的水位线接收stream
package com.flink.demo.connector; import java.time.Duration; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.alibaba.fastjson.JSONObject; import com.flink.demo.utils.TransformUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; public class SchoolDataWithWaterStream { public static DataStream<JSONObject> getSchoolDataStream(StreamExecutionEnvironment env) { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307) .username("root").password("123456").databaseList("users").tableList("users.school") .startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("Asia/Shanghai").build(); // 2.使用CDC Source从MySQL读取数据 DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce, WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)) .withTimestampAssigner(new SerializableTimestampAssigner<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp(String extractData, long l) { return JSONObject.parseObject(extractData).getLong("ts_ms"); } }), "SchoolDataStreamNoWatermark Source"); // 3.转换为指定格式 DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> { return TransformUtil.formatResult(rawData); }); return teacherDataStream; } }
三、最后编写job
package com.flink.demo.mysql; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import com.alibaba.fastjson.JSONObject; import com.flink.demo.connector.SchoolDataStream; import com.flink.demo.connector.SchoolDataWithWaterStream; import com.flink.demo.connector.UserDataStream; import com.flink.demo.connector.UserDataWithWaterStream; public class MySqlCdcReaderWithWater { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.1获取学生数据流 DataStream<JSONObject> userDataStream = UserDataWithWaterStream.getUserDataStream(env); // 2.2获取学校数据流 DataStream<JSONObject> schoolDataStream = SchoolDataWithWaterStream.getSchoolDataStream(env); // 3.窗口联结(学生流和学校流)打印输出 windowInnerJoinAndPrint(userDataStream, schoolDataStream); // 4.执行任务 env.execute("Time Window Out Join Job"); } /** * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。 * * @param userDataStream 用户数据流 * @param schoolDataStream 学校数据流 */ private static void windowInnerJoinAndPrint(DataStream<JSONObject> userDataStream, DataStream<JSONObject> schoolDataStream) { DataStream<JSONObject> studentsDataStream = userDataStream.join(schoolDataStream) .where(user -> user.getString("school_id")).equalTo(school -> school.getString("id")) .window(TumblingEventTimeWindows.of(Time.seconds(10L))) .apply(new JoinFunction<JSONObject, JSONObject, JSONObject>() { /** * */ private static final long serialVersionUID = -8027510260773592640L; @Override public JSONObject join(JSONObject jsonObject, JSONObject jsonObject2) { // 拼接 jsonObject.putAll(jsonObject2); return jsonObject; } }); studentsDataStream.print("Window Out Join user and school"); } }
四、测试
这里我们准备两条sql,掐着时间点执行
INSERT INTO `users`.`user` (`id`, `name`, `age`, `sex`, `school_id`) VALUES (41, '王八', 2, '2', 42); INSERT INTO `users`.`school` (`id`, `school_name`, `school_address`) VALUES (42, '北京十中', '石景山');
然后我们能看到迟到的数据也能被join上
以上就是我们介绍的利用水位线来操作数据流。
备注:
1、使用水位线的话需要自己定义一个时间戳。
2、需要在接收stream的时候,设置水位线的延迟,本案例的核心代码如下:
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)) .withTimestampAssigner(new SerializableTimestampAssigner<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp(String extractData, long l) { return JSONObject.parseObject(extractData).getLong("ts_ms"); } })
最后按照惯例附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...