在前面我们介绍过Flink Datastream api开发中主要的三个基本组成:
source etl sink
这里的etl就是指对数据的处理,我们需要依靠flink api里面的Transformation算子进行转换,本文的话,我们介绍下flink DataStream api中常见的一些算子。
1)map
这里的map和《Flink应用开发系列(十三)数据集转换之MapFunction》是一样的,就是把数据从1个元素转换成对应的另外一个元素,示例代码如下:
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
2)flatmap
这里的flatmap和《Flink应用开发系列(十四)数据集转换之FlatMapFunction》是一样的,就是把一个元素转换成对应的多个元素,示例代码如下:
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
3)Filter
这里的filter和《Flink应用开发系列(十六)数据集转换之FilterFunction》是一样的,就是把符合条件的元素给保留下来,示例代码如下:
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
4)keyBy
把具有相同key的元素分配到同一个分区。示例代码如下:
dataStream.keyBy(value -> value.f0);
5)Reduce
把相同key对应的不同的value集合做一个reduce的操作,输出新值,示例代码如下:
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
6)window
对某个分区上的数据定义时间窗口,在当前时间窗口内的时间段的数据进行分组。示例代码如下:
dataStream .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5)));
7)windowall
对所有分区的数据定义时间窗口,在当前时间窗口内的时间段的所有流数据进行分组,示例代码如下:
dataStream .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
8)window apply
将一个通用的function函数应用于整个窗口,可以看作是整个source的全局拦截器,示例代码如下:
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // 在 non-keyed 窗口流上应用 AllWindowFunction allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
9)windowReduce
对窗口应用reduce function函数,然后返回一个reduce后的值,也可以看作是整个source的拦截器,示例代码如下:
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } });
10)Union
将两个火多个数据流合并起来,创建一个新的流。示例代码如下:
dataStream.union(otherStream1, otherStream2, ...);
11)window join
根据指定的key和窗口,对两个数据流进行join,示例代码如下:
dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});
12)Interval Join
根据key相等并且满足指定的时间范围内的元素进行join,这里的指定时间范围计算公式如下:
e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
示例代码如下:
// this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...});
13)Window CoGroup
根据指定的key和窗口将两个数据流组合在一起,示例代码如下:
dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});
14)Connect
连接两个数据流并保留各种的类型,这里的connect可以允许两个数据流是不同的数据类型,示例代码如下:
DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
15)CoMap,CoFlatMap
类似于在连接的数据流上进行map和flatmap,示例代码如下:
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });
16)Iterate
将某个算子的输出重定向到某个其他的算子中,相当于创建反馈循环。示例代码如下:
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
17)cache
把算子的结果缓存起来,示例代码如下:
DataStream<Integer> dataStream = //... CachedDataStream<Integer> cachedDataStream = dataStream.cache(); cachedDataStream.print(); // Do anything with the cachedDataStream... env.execute(); // Execute and create cache. cachedDataStream.print(); // Consume cached result. env.execute();
18)自定义分区
这里的话需要自定义一个Partitioner,根据自定义的分区规则进行分区,示例代码如下:
dataStream.partitionCustom(customerpartitioner, ${参数});
19)随机分区
将元素随机的均匀的划分到分区去,示例代码如下:
dataStream.shuffle();
20)广播
把元素广播到每一个分区上去,示例代码如下:
dataStream.broadcast();
以上就是我们在flink datastream 应用开发中常用的transformation算子。
还没有评论,来说两句吧...