在前面我们演示了一下时间窗口的函数,这篇我们介绍下flink的滚动技术窗口函数案例实战。
在flink中实现滚动计数窗口最主要的函数就是 countWindow()方法。
一、定义模拟一个数据源
package com.flink.demo.sources; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * 由于是demo,所以这里我们自定义一个source,然后自动向source发送数据 * * @author Administrator * */ public class MySources implements SourceFunction<String> { private boolean running = Boolean.TRUE; private static final String[] words = new String[] { "this", "is", "flink", "DataStream", "API", "test" }; private static final Random RANDOM = new Random(); private static final long serialVersionUID = -5258863350761775561L; public MySources() { } @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { this.sendDatas(ctx); TimeUnit.MILLISECONDS.sleep(100); } } /** * 随机发送数据 * @param ctx */ private void sendDatas(SourceContext<String> ctx) { String word = words[RANDOM.nextInt(words.length)]; System.out.println("发送了:"+word); ctx.collect(word); } @Override public void cancel() { running = Boolean.FALSE; } }
二、对数据源进行切割,做word->(word,1)
package com.flink.demo.function; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * * @author Administrator * */ public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>>{ private static final long serialVersionUID = -5874538656857236727L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(new Tuple2<String, Integer>(value, 1)); } }
三、编写主类,使用滚动计数窗口
package com.test.demo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.flink.demo.function.Splitter; import com.flink.demo.sources.MySources; public class CountWindowJob { public static void main(String[] args) throws Exception { // 初始化一个DataStream API得到执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 由于是需要使用到时间窗口,因此这里我们需要设置下时间语义,如果没有设置这个时间语义,就会报错 environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 添加一个数据源 DataStreamSource<String> source = environment.addSource(new MySources()); source.flatMap(new Splitter()) // 根据第一个数据进行聚合 .keyBy(0) // 计数窗口的数量是5,时间窗口这里配置的是时间,计数窗口这里配置的是数量 .countWindow(5) // 根据key进行聚合求和 .sum(1) // 打印输出出来 .print("CountWindowJob"); environment.execute(); } }
上面的逻辑都卸载了代码里面。然后我们运行一下看看
从图示中我们看到了运行结果。
备注:
1、上诉结果例子里面有没有发现端倪,也就是当统计的数据数量达到了设定的5才有打印出来。
2、上诉例如他不是一个接收5个数据,然后进行统计求和得出逻辑结果的,他是统计窗口内的数据,当窗口内接收的数据统计达到5之后,才会走后面的逻辑
还没有评论,来说两句吧...