前面我们介绍了时间窗口和计数窗口,这篇我们介绍下会话窗口。
会话窗口可以简单的理解成,消费方直接对接数据源,如果数据源在超过设定的时间内没有接收到数据,则当前对话被销毁,当前的对话就是一个窗口。当老的会话窗口被销毁后,flink又会创建新的会话窗口,继续周而复始的接收数据。所以会话窗口的特点是:
1、每个会话窗口接收的数据数量是不一致的。 2、每个会话窗口的对接时间是不固定的。
在flink中,我们主要使用的会话窗口函数是:ProcessingTimeSessionWindows.withGap。下面,我们演示下如何使用会话窗口
一、改造下mysource类,改为时间随机发送
package com.flink.demo.sources; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * 由于是demo,所以这里我们自定义一个source,然后自动向source发送数据 * * @author Administrator * */ public class MySources implements SourceFunction<String> { private boolean running = Boolean.TRUE; private static final String[] words = new String[] { "this", "is", "flink", "DataStream", "API", "test" }; private static final Random RANDOM = new Random(); private static final long serialVersionUID = -5258863350761775561L; public MySources() { } @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { this.sendDatas(ctx); TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(10000)); } } /** * 随机发送数据 * @param ctx */ private void sendDatas(SourceContext<String> ctx) { String word = words[RANDOM.nextInt(words.length)]; System.out.println("发送了:"+word); ctx.collect(word); } @Override public void cancel() { running = Boolean.FALSE; } }
二、splitter类不变
package com.flink.demo.function; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * * @author Administrator * */ public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>>{ private static final long serialVersionUID = -5874538656857236727L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(new Tuple2<String, Integer>(value, 1)); } }
三、然后写主方法
package com.test.demo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import com.flink.demo.function.Splitter; import com.flink.demo.sources.MySources; public class ConversationWindow { public static void main(String[] args) throws Exception { // 初始化执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 添加数据源 DataStreamSource<String> source = environment.addSource(new MySources()); // 对数据进行切割 word -> (word,1) source.flatMap(new Splitter()) // 根据第一个字段进行聚合 .keyBy(0) // 设置使用会话窗口,超过2秒会话中断 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(2))) // 最后求和 .sum(1) // 输出打印出来 .print(); environment.execute(); } }
然后我们运行下,看下执行结果
以上就是会话窗口的代码案例。
还没有评论,来说两句吧...