在前面的文章我们主要介绍了flink多数据源的时间窗口,然后对数据进行计算,但是试想一下,真实的生产环境中我们经常多数据源都是乱序的,所以此时我们多个数据源拿过来进行join的时候,由于数据处理的延迟,会导致如下情况:
同时有A和B两批数据发进来,时间窗口设置为1分组,按照推算在10:10-10:11分之间A和B分别都会有10条数据进来,但是由于乱序的原因,本来B的提前发了其他数据, 导致本来B应该在10:10分到10:11分发送10条数据的,但是真实的情况确实B在10:10分到10:11分只发了8条数据,剩下两条数据在10:11分到10:12分才发过来。
此时按照常规的窗口计算的话,本来应该处理10条数据,结果只处理了8条数据,剩下两条数据在后面的时间过来的,结果前面窗口的数据已经被销毁了,导致数据就join不起来,最终的结果也就不准确了。所以这时候怎么办呢?这就是这里提到的水位线。
在flink中,水位线的意思是:
在事件时间的语义下,不依赖系统时间,而是基于数据自带的时间戳去定义一个时钟,用来表示当前时间的进展。 在数据流中加入一个时钟标记,记录当前的事件时间,这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了,这种类似于水流中用来做标志的记号,在Flink中被称为水位线
水位线的效果就是:
假设场景,比如说上学校车,校车每天早上九点发车,但有一部分学生可能九点零二才能赶来,于是小明偷偷把司机的时间调后了两秒,这样当时间到了九点(延后两秒的九点)大家都能上车了。
所以按照我的理解就是原本时间窗口是10点10分到10点11分,这边利用水位线的话,就相当于给时间种调慢一点,原本到10点11分进行一次结算,如果设置了水位线为5秒,则真实的情况是在10点11分5秒的时候进行一次结算,这样子就尽量整合完所有的数据。
下面我们还是按照惯例来演示下,还是使用前面的案例
一、定义学生的水位线接收stream
package com.flink.demo.connector;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.flink.demo.utils.TransformUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class UserDataWithWaterStream {
public static DataStream<JSONObject> getUserDataStream(StreamExecutionEnvironment env) {
// 1.创建Flink-MySQL-CDC的Source
MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307)
.username("root").password("123456").databaseList("users").tableList("users.user")
.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai").build();
// 2.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(String extractData, long l) {
return JSONObject.parseObject(extractData).getLong("ts_ms");
}
}),
"UserDataStreamNoWatermark Source");
// 3.转换为指定格式
DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
return TransformUtil.formatResult(rawData);
});
return teacherDataStream;
}
}二、定义学校的水位线接收stream
package com.flink.demo.connector;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.flink.demo.utils.TransformUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class SchoolDataWithWaterStream {
public static DataStream<JSONObject> getSchoolDataStream(StreamExecutionEnvironment env) {
// 1.创建Flink-MySQL-CDC的Source
MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307)
.username("root").password("123456").databaseList("users").tableList("users.school")
.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai").build();
// 2.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(String extractData, long l) {
return JSONObject.parseObject(extractData).getLong("ts_ms");
}
}),
"SchoolDataStreamNoWatermark Source");
// 3.转换为指定格式
DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
return TransformUtil.formatResult(rawData);
});
return teacherDataStream;
}
}三、最后编写job
package com.flink.demo.mysql;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSONObject;
import com.flink.demo.connector.SchoolDataStream;
import com.flink.demo.connector.SchoolDataWithWaterStream;
import com.flink.demo.connector.UserDataStream;
import com.flink.demo.connector.UserDataWithWaterStream;
public class MySqlCdcReaderWithWater {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.1获取学生数据流
DataStream<JSONObject> userDataStream = UserDataWithWaterStream.getUserDataStream(env);
// 2.2获取学校数据流
DataStream<JSONObject> schoolDataStream = SchoolDataWithWaterStream.getSchoolDataStream(env);
// 3.窗口联结(学生流和学校流)打印输出
windowInnerJoinAndPrint(userDataStream, schoolDataStream);
// 4.执行任务
env.execute("Time Window Out Join Job");
}
/**
* 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
*
* @param userDataStream 用户数据流
* @param schoolDataStream 学校数据流
*/
private static void windowInnerJoinAndPrint(DataStream<JSONObject> userDataStream,
DataStream<JSONObject> schoolDataStream) {
DataStream<JSONObject> studentsDataStream = userDataStream.join(schoolDataStream)
.where(user -> user.getString("school_id")).equalTo(school -> school.getString("id"))
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.apply(new JoinFunction<JSONObject, JSONObject, JSONObject>() {
/**
*
*/
private static final long serialVersionUID = -8027510260773592640L;
@Override
public JSONObject join(JSONObject jsonObject, JSONObject jsonObject2) {
// 拼接
jsonObject.putAll(jsonObject2);
return jsonObject;
}
});
studentsDataStream.print("Window Out Join user and school");
}
}四、测试
这里我们准备两条sql,掐着时间点执行
INSERT INTO `users`.`user` (`id`, `name`, `age`, `sex`, `school_id`) VALUES (41, '王八', 2, '2', 42); INSERT INTO `users`.`school` (`id`, `school_name`, `school_address`) VALUES (42, '北京十中', '石景山');
然后我们能看到迟到的数据也能被join上
以上就是我们介绍的利用水位线来操作数据流。
备注:
1、使用水位线的话需要自己定义一个时间戳。
2、需要在接收stream的时候,设置水位线的延迟,本案例的核心代码如下:
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(String extractData, long l) {
return JSONObject.parseObject(extractData).getLong("ts_ms");
}
})最后按照惯例附上本案例的源码,登录后即可下载。










还没有评论,来说两句吧...