在上一篇文章我们介绍了DataSet API的专用转换算子,这篇文章我们介绍下DataStream API的专用转换算子。
一、多流转换算子
1.1、Union算子
Union算子可以将两个或多个数据流进行合并,从而创建一个包含所有流中元素的新流。具体代码示例如下:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source1 = environment.addSource(new MySources()); DataStreamSource<String> source2 = environment.addSource(new MySources()); DataStream<String> datasource = source1.union(source2);
1.2、Connect算子
Connect算子连接两个数据流,但这两个数据流只是被放在同一个流中,依然保持各自的数据和形式,不会发生任何变化,两个数据流的数据相互独立,具体示例如下:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source1 = environment.addSource(new MySources()); DataStreamSource<String> source2 = environment.addSource(new MySources()); ConnectedStreams<String, String> datasource = source1.connect(source2);
备注上面的connect算子和Union算子的区别主要是:
1、Connect算子可以连接两个不同数据类型的数据流,而Union算子必须要两个数据流的数据类型相同。
2、Union算子支持两个及两个以上的数据流合并,Connect算子只能支持2个数据流。
1.3、CoMap 算子和CoFlatMap算子
CoMap算子和CoFlatMap算子可以将ConnectedStreams流转换成DataStream的流,与连接数据流算子Map和FlatMap相似。
1.4、Split算子
Split算子根据某些特征把一个DataStream流拆分为多个DataStream流。
1.5、Select算子
Select算子从一个SplitStream流中获取一个或多个DataStream流。
二、键控流转换算子
2.1、KeyBy算子
KeyBy算子根据指定的键,将DataStream流转换为KeyedStrean流。使用KeyBy算子必须使用键控状态。KeyBy算子从逻辑上将流划分为不想交的分区。具有相同键的所有记录都被分配给同一个分区。在内部,keyBy()方法是通过hash分区来实现的。代码示例如下:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> datasource = environment.addSource(new MySources()); datasource.keyBy(0);
2.2、Aggregation算子
Aggregation算子可以将KeyedStream流转换为DataStream流。主要的函数有:
sum min max minBy maxBy
2.3、Reduce算子
Reduce算子对键控流进行滚动压缩,将当前元素的值与最后一个元素的Reduce的值合并,产生一个新值。示例代码如下:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> datasource = environment.fromElements(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 2)); datasource.keyBy(0).reduce(new RichReduceFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String, Integer>(value1.f0,value1.f1+value2.f1); } }).print();
三、窗口转换算子
3.1、Window算子
Window算子可以在已经分区的KeyedStream流上定义窗口,把KeyedStream流转换为WindowedStream流。Window算子根据某些特征将每个键中的数据分组,例如:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> datasource = environment.addSource(new MySources()); datasource.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(2)));
3.2、WindowAll算子
窗口可以在常规DataStream流上定义。WindowAll算子可以将DataStream流转换为AllWindowedStream流。窗口会根据某些特征(如最近2秒内到达的数据)对所有的流事件进行分组。在许多情况下,这是非并行转换。所有记录将被收集在该算子的一项任务中。例如:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> datasource = environment.addSource(new MySources()); datasource.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)));
3.3、Window Apply算子
Window Apply 算子将一般功能应用于整个窗口,可以将WindowedStream流或者AllWindowedStream流转换为DataStream流。如果使用WindowAll算子进行转换,则需要使用AllWindowFunction函数。
3.4、Window Reduce算子
Window Reduce算子将功能化的Reduce函数应用于窗口,可以将WindowedStream流转换为DataStream流。例如:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> datasource = environment.addSource(new MySources()); AllWindowedStream<String, TimeWindow> windowedStream = datasource.windowAll(TumblingEventTimeWindows.of(Time.seconds(2))); windowedStream.reduce(new RichReduceFunction<String>() { private static final long serialVersionUID = 1L; @Override public String reduce(String value1, String value2) throws Exception { return value1+value2; } });
3.5、Window Fold算子
Window Fold算子将实用的折叠功能应用于窗口,并且返回折叠值。
3.6、Window CoGroup算子
Window CoGroup算子在给定键和一个公共窗口上将两个数据流组合在一起,并且将两个DataStream的流转换为一个DataStream流。
四、物理分区算子
4.1、partitionCustom算子
将一个DataStream流转换为另外一个DataStream流。使用用户自定义的分区程序为每个元素选择目标任务
4.2、shuffle算子
将一个DataStream流转换为另外一个DataStream流。根据均匀分布对元素进行随机划分。
4.3、rebalance算子
将一个DataStream流转换为另外一个DataStream流。每个分区创建相等的负载。在存在数据倾斜时,该算子对性能优化有用。
4.4、rescale算子
将一个DataStream流转换为另外一个DataStream流。相当于低配版的rebalance算子。
4.5、broadcast算子
将一个DataStream流转换为另外一个DataStream流。将元素广播到每一个分区。
五、其他算子
5.1、Fold算子
Fold算子可以将KeyedStream流转换成DataStream流,在带有初始值的键控流上滚动折叠,将当前元素与上一个折叠值组合在一起计算出新值。
5.2、Interval Join算子
Interval Join算子可以将两个KeyedStream流转换成DataStream流。在给定的时间间隔内,用公共键将两个键控流的两个元素连接起来。
5.3、Iterate算子
Iterate算子可以将DataStream流转换成IterativeStream流,然后再转换为DataStream流。通过将一个算子的输出重定向到某个先前的算子,在流中创建反馈循环。这对于定义不断更新的模型算法特别有用。Iterate算子提供了一种流计算中的类似于递归的方法。
还没有评论,来说两句吧...