上一篇我们介绍了flink的滚动计数窗口函数,这篇文章我们介绍下Flink的滑动计数窗口函数案例。
滑动技术窗口主要其实就是在滚动技术窗口的countWindow里面添加一个滑动大小的值,其他的都是一模一样的。我们直接上主类
package com.test.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.flink.demo.function.Splitter;
import com.flink.demo.sources.MySources;
public class CountWindowJob {
public static void main(String[] args) throws Exception {
// 初始化一个DataStream API得到执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 由于是需要使用到时间窗口,因此这里我们需要设置下时间语义,如果没有设置这个时间语义,就会报错
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 添加一个数据源
DataStreamSource<String> source = environment.addSource(new MySources());
source.flatMap(new Splitter())
// 根据第一个数据进行聚合
.keyBy(0)
// 滑动窗口的数量是5,滑动大小是1
.countWindow(5,1)
// 根据key进行聚合求和
.sum(1)
// 打印输出出来
.print("CountWindowJob");
environment.execute();
}
}然后我们运行一下看下结果
最后我们看到滑动计数窗口比滚动技术窗口有更直观的表现,也就是接收到了几个数据,就会统计几个,不同等到设置的阈值再进行计算逻辑。










还没有评论,来说两句吧...