前面我们介绍了时间窗口和计数窗口,这篇我们介绍下会话窗口。
会话窗口可以简单的理解成,消费方直接对接数据源,如果数据源在超过设定的时间内没有接收到数据,则当前对话被销毁,当前的对话就是一个窗口。当老的会话窗口被销毁后,flink又会创建新的会话窗口,继续周而复始的接收数据。所以会话窗口的特点是:
1、每个会话窗口接收的数据数量是不一致的。 2、每个会话窗口的对接时间是不固定的。
在flink中,我们主要使用的会话窗口函数是:ProcessingTimeSessionWindows.withGap。下面,我们演示下如何使用会话窗口
一、改造下mysource类,改为时间随机发送
package com.flink.demo.sources; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * 由于是demo,所以这里我们自定义一个source,然后自动向source发送数据 * * @author Administrator * */ public class MySources implements SourceFunction<String> { private boolean running = Boolean.TRUE; private static final String[] words = new String[] { "this", "is", "flink", "DataStream", "API", "test" }; private static final Random RANDOM = new Random(); private static final long serialVersionUID = -5258863350761775561L; public MySources() { } @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { this.sendDatas(ctx); TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(10000)); } } /** * 随机发送数据 * @param ctx */ private void sendDatas(SourceContext<String> ctx) { String word = words[RANDOM.nextInt(words.length)]; System.out.println("发送了:"+word); ctx.collect(word); } @Override public void cancel() { running = Boolean.FALSE; } }
二、splitter类不变
package com.flink.demo.function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
*
* @author Administrator
*
*/
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
private static final long serialVersionUID = -5874538656857236727L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<String, Integer>(value, 1));
}
}三、然后写主方法
package com.test.demo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import com.flink.demo.function.Splitter;
import com.flink.demo.sources.MySources;
public class ConversationWindow {
public static void main(String[] args) throws Exception {
// 初始化执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加数据源
DataStreamSource<String> source = environment.addSource(new MySources());
// 对数据进行切割 word -> (word,1)
source.flatMap(new Splitter())
// 根据第一个字段进行聚合
.keyBy(0)
// 设置使用会话窗口,超过2秒会话中断
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
// 最后求和
.sum(1)
// 输出打印出来
.print();
environment.execute();
}
}然后我们运行下,看下执行结果
以上就是会话窗口的代码案例。


还没有评论,来说两句吧...