相信大家跟着前面的应用开发系列的文章进行充分的自行编码之后,对于flink应用开发都比较熟悉了。本文我们介绍下数据集转换中的mapfunction。
mapfunction主要应用于map函数,这里我们可以通过mapfunction里面的map方法,把原有的数据进行一对一的转换。
在实际的flink应用开发过程中,使用mapfunction的话,一般我们有两种方式来实现:
第一种方式是使用匿名内部类,也就是类似于:
input.map(new MapFunction<Integer, String>() {
@Override
public String map(Integer value) throws Exception {
return "userId:"+value;
}
}).print();第二种方式是直接从新创建一个雷,继承自RichMapFunction类,例如我们前面的分布式缓存的使用方法:
package org.example.func;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DistributeCacheMapper extends RichMapFunction<Integer, String> {
private static final Map<Integer,String> users = new HashMap<Integer,String>();
@Override
public void open(Configuration parameters) throws Exception {
File userScores = getRuntimeContext().getDistributedCache().getFile("user_scores");
List<String> list = FileUtils.readLines(userScores, Charset.forName("UTF-8"));
for(String v : list){
String[] tokens = v.split(",");
users.put(Integer.valueOf(tokens[0]),tokens[1]);
}
super.open(parameters);
}
@Override
public String map(Integer integer) throws Exception {
String score = users.get(integer);
return integer+":"+score;
}
}两种实现方式都可以,最终其实主要是还是在map方法里面实现具体的转换逻辑。下面我们来一个完整的mapfunc数据集转换的代码,这里我们使用的是匿名内部类的方法:
package org.example;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.example.func.DistributeCacheMapper;
public class DataSetMapFuncJob {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> input = env.fromElements(2, 1);
input.map(new MapFunction<Integer, String>() {
@Override
public String map(Integer value) throws Exception {
return "userId:"+value;
}
}).print();
}
}运行看下效果:
最后按照惯例,附上本案例的源码,登录后即可下载。










还没有评论,来说两句吧...