在前面我们介绍过Flink Datastream api开发中主要的三个基本组成:
source etl sink
这里的etl就是指对数据的处理,我们需要依靠flink api里面的Transformation算子进行转换,本文的话,我们介绍下flink DataStream api中常见的一些算子。
1)map
这里的map和《Flink应用开发系列(十三)数据集转换之MapFunction》是一样的,就是把数据从1个元素转换成对应的另外一个元素,示例代码如下:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});2)flatmap
这里的flatmap和《Flink应用开发系列(十四)数据集转换之FlatMapFunction》是一样的,就是把一个元素转换成对应的多个元素,示例代码如下:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});3)Filter
这里的filter和《Flink应用开发系列(十六)数据集转换之FilterFunction》是一样的,就是把符合条件的元素给保留下来,示例代码如下:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});4)keyBy
把具有相同key的元素分配到同一个分区。示例代码如下:
dataStream.keyBy(value -> value.f0);
5)Reduce
把相同key对应的不同的value集合做一个reduce的操作,输出新值,示例代码如下:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});6)window
对某个分区上的数据定义时间窗口,在当前时间窗口内的时间段的数据进行分组。示例代码如下:
dataStream .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5)));
7)windowall
对所有分区的数据定义时间窗口,在当前时间窗口内的时间段的所有流数据进行分组,示例代码如下:
dataStream .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
8)window apply
将一个通用的function函数应用于整个窗口,可以看作是整个source的全局拦截器,示例代码如下:
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// 在 non-keyed 窗口流上应用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});9)windowReduce
对窗口应用reduce function函数,然后返回一个reduce后的值,也可以看作是整个source的拦截器,示例代码如下:
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});10)Union
将两个火多个数据流合并起来,创建一个新的流。示例代码如下:
dataStream.union(otherStream1, otherStream2, ...);
11)window join
根据指定的key和窗口,对两个数据流进行join,示例代码如下:
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});12)Interval Join
根据key相等并且满足指定的时间范围内的元素进行join,这里的指定时间范围计算公式如下:
e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
示例代码如下:
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});13)Window CoGroup
根据指定的key和窗口将两个数据流组合在一起,示例代码如下:
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});14)Connect
连接两个数据流并保留各种的类型,这里的connect可以允许两个数据流是不同的数据类型,示例代码如下:
DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
15)CoMap,CoFlatMap
类似于在连接的数据流上进行map和flatmap,示例代码如下:
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});16)Iterate
将某个算子的输出重定向到某个其他的算子中,相当于创建反馈循环。示例代码如下:
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});17)cache
把算子的结果缓存起来,示例代码如下:
DataStream<Integer> dataStream = //... CachedDataStream<Integer> cachedDataStream = dataStream.cache(); cachedDataStream.print(); // Do anything with the cachedDataStream... env.execute(); // Execute and create cache. cachedDataStream.print(); // Consume cached result. env.execute();
18)自定义分区
这里的话需要自定义一个Partitioner,根据自定义的分区规则进行分区,示例代码如下:
dataStream.partitionCustom(customerpartitioner, ${参数});19)随机分区
将元素随机的均匀的划分到分区去,示例代码如下:
dataStream.shuffle();
20)广播
把元素广播到每一个分区上去,示例代码如下:
dataStream.broadcast();
以上就是我们在flink datastream 应用开发中常用的transformation算子。

还没有评论,来说两句吧...