上一篇我们介绍了一下Flink的滚动时间窗口函数,这篇问我们介绍下滑动时间窗口函数。其实说到底,滚动时间窗口和滑动时间窗口是一样的,只是多了一个延迟时间。本篇的演示MySouce类和Splitter类是一样的,我们只需要改动窗口函数即可,上主要代码:
package com.test.demo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import com.flink.demo.function.Splitter; import com.flink.demo.sources.MySources; public class FlinkTimeWindow { public static void main(String[] args) throws Exception { //初始化一个DataStream API得到执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //由于是需要使用到时间窗口,因此这里我们需要设置下时间语义,如果没有设置这个时间语义,就会报错 environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //添加一个数据源 DataStreamSource<String> source = environment.addSource(new MySources()); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source. //接收到的是单个单词,这里需要转换一下,即 word - (word,1) flatMap(new Splitter()) //根据word进行聚合 .keyBy(0) //设置时间窗口为3秒钟,滑动时间为1分钟 .timeWindow(Time.seconds(3),Time.seconds(1)) //在每3秒钟一次的情况下,对word聚合的结果进行求和。 .sum(1); //这里其实是一个真实的业务逻辑处理,我们在这里主要是把时间窗口类的逻辑处理结果打印出来做演示而已。 sum.print(); environment.execute("flinktimewindow"); } }
再执行下看下结果:
备注下:
1、滚动时间窗口和滑动时间窗口只是传递的参数不一致。
2、滑动窗口的结果有重复,也就是当前的结果里面包含了上一次统计的结果。
还没有评论,来说两句吧...