在flink中,当我们处理有界数据和无界数据的时候,我们读取了数据源之后就需要进行tranform操作。在transform里面,我们转换过程中的各种操作类型被称为算子。然后不同的转换组合成一个复杂的数据流拓扑。
下面我们介绍下flink中DataStream和DataSet的通用算子有哪些?
一、map算子
map算子根据数据源的1个元素产生一个新的元素,通常用来对数据集中的数据进行清洗和转换。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18),new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20)); datasources.map(f -> (f.f1+10)).print();
二、FlatMap算子
FlatMap算子根据数据源的一个元素产生0个、1个甚至多个元素。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20)); datasources.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private static final long serialVersionUID = -7407625554978856125L; @Override public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { if (value.f1 > 19) { out.collect(value); } } }).print();
三、Filter算子
Filter算子用于进行数据过滤,筛选出符合条件的数据,也就是我们把对应的值进行一定的规则判断,返回true则保留,返回false则放弃掉。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20)); datasources.filter(new RichFilterFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 8317075170456401561L; @Override public boolean filter(Tuple2<String, Integer> value) throws Exception { if (value.f1 > 19) return true; else return false; } }).print();
四、Project算子
Project算子主要用于对数据集中的数据进行映射转换,他讲删除或者移动元组数据集的元组字段。具体代码示例如下:
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18),new Tuple2<String, Integer>("李四", 19),new Tuple2<String, Integer>("王五", 20)); ProjectOperator<?, Tuple> out1 = datasources.project(1);// 这里相当于只取第1个位置的字段,备注:字段其实位置是0 ProjectOperator<?, Tuple> out2 = datasources.project(1,0);// 这里相当把第一个位置的数据和第0个位置换个位置取,备注:字段其实位置是0 out1.print(); out2.print();
还没有评论,来说两句吧...