在flink中,当我们处理有界数据和无界数据的时候,我们读取了数据源之后就需要进行tranform操作。在transform里面,我们转换过程中的各种操作类型被称为算子。然后不同的转换组合成一个复杂的数据流拓扑。
下面我们介绍下flink中DataStream和DataSet的通用算子有哪些?
一、map算子
map算子根据数据源的1个元素产生一个新的元素,通常用来对数据集中的数据进行清洗和转换。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18),new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20)); datasources.map(f -> (f.f1+10)).print();
二、FlatMap算子
FlatMap算子根据数据源的一个元素产生0个、1个甚至多个元素。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20));
datasources.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -7407625554978856125L;
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
throws Exception {
if (value.f1 > 19) {
out.collect(value);
}
}
}).print();三、Filter算子
Filter算子用于进行数据过滤,筛选出符合条件的数据,也就是我们把对应的值进行一定的规则判断,返回true则保留,返回false则放弃掉。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20));
datasources.filter(new RichFilterFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 8317075170456401561L;
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
if (value.f1 > 19)
return true;
else
return false;
}
}).print();四、Project算子
Project算子主要用于对数据集中的数据进行映射转换,他讲删除或者移动元组数据集的元组字段。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18),new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20));
ProjectOperator<?, Tuple> out1 = datasources.project(1);// 这里相当于只取第1个位置的字段,备注:字段其实位置是0
ProjectOperator<?, Tuple> out2 = datasources.project(1,0);// 这里相当把第一个位置的数据和第0个位置换个位置取,备注:字段其实位置是0
out1.print();
out2.print();
还没有评论,来说两句吧...