在实际的业务场景开发中,我们经常会涉及到这样的一些业务,例如风控系统的需求:
某个用户在5分钟之内登录失败超过3次,那么就要进行预警与账号封禁
这个需求就是只统计以5分钟为单位的一段时间内的数据,所以整个时间是一段一段的,在flink中,我们成为时间窗口。使用window函数。所以这里的话我们也介绍下使用时间窗口函数来进行一下计数,实现统计每个时间段的wordcount。示例代码如下:
package org.example.datastream; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env //使用socket进行演练 .socketTextStream("192.168.31.218", 9999) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { //把英文语句进行切割,切割为一个个单词 for (String word : s.split(" ")) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } }) //对切割出来的单词进行key聚合,使相同的key分发到相同的分区里面去 .keyBy(value -> value.f0) //设置时间窗口为5秒 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //对时间窗口内的数据进行reduce求和,简写为sum函数 .sum(1); dataStream.print(); env.execute("Window WordCount"); } }
在上诉的代码里面,我们添加了注释,相信大家看的很明白,下面我们执行看看效果,首先启动nc,发送两个语句:
然后我们启动这个job,可以看到打印出来了具体的计数。
然后我们等待5秒后 再发送一条英文语句:
然后查看job的控制台,可以看到输出的都是1,没有对之前的数据进行累加
说明他统计的就是5秒时间内的所有事件数据,以上就是一个简单的时间窗口的函数实现,最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...