在实际的业务场景开发中,我们经常会涉及到这样的一些业务,例如风控系统的需求:
某个用户在5分钟之内登录失败超过3次,那么就要进行预警与账号封禁
这个需求就是只统计以5分钟为单位的一段时间内的数据,所以整个时间是一段一段的,在flink中,我们成为时间窗口。使用window函数。所以这里的话我们也介绍下使用时间窗口函数来进行一下计数,实现统计每个时间段的wordcount。示例代码如下:
package org.example.datastream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
//使用socket进行演练
.socketTextStream("192.168.31.218", 9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//把英文语句进行切割,切割为一个个单词
for (String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
//对切割出来的单词进行key聚合,使相同的key分发到相同的分区里面去
.keyBy(value -> value.f0)
//设置时间窗口为5秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//对时间窗口内的数据进行reduce求和,简写为sum函数
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
}在上诉的代码里面,我们添加了注释,相信大家看的很明白,下面我们执行看看效果,首先启动nc,发送两个语句:
然后我们启动这个job,可以看到打印出来了具体的计数。
然后我们等待5秒后 再发送一条英文语句:
然后查看job的控制台,可以看到输出的都是1,没有对之前的数据进行累加
说明他统计的就是5秒时间内的所有事件数据,以上就是一个简单的时间窗口的函数实现,最后按照惯例,附上本案例的源码,登录后即可下载。













还没有评论,来说两句吧...