相信大家跟着前面的应用开发系列的文章进行充分的自行编码之后,对于flink应用开发都比较熟悉了。本文我们介绍下数据集转换中的mapfunction。
mapfunction主要应用于map函数,这里我们可以通过mapfunction里面的map方法,把原有的数据进行一对一的转换。
在实际的flink应用开发过程中,使用mapfunction的话,一般我们有两种方式来实现:
第一种方式是使用匿名内部类,也就是类似于:
input.map(new MapFunction<Integer, String>() { @Override public String map(Integer value) throws Exception { return "userId:"+value; } }).print();
第二种方式是直接从新创建一个雷,继承自RichMapFunction类,例如我们前面的分布式缓存的使用方法:
package org.example.func; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import java.io.File; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; import java.util.Map; public class DistributeCacheMapper extends RichMapFunction<Integer, String> { private static final Map<Integer,String> users = new HashMap<Integer,String>(); @Override public void open(Configuration parameters) throws Exception { File userScores = getRuntimeContext().getDistributedCache().getFile("user_scores"); List<String> list = FileUtils.readLines(userScores, Charset.forName("UTF-8")); for(String v : list){ String[] tokens = v.split(","); users.put(Integer.valueOf(tokens[0]),tokens[1]); } super.open(parameters); } @Override public String map(Integer integer) throws Exception { String score = users.get(integer); return integer+":"+score; } }
两种实现方式都可以,最终其实主要是还是在map方法里面实现具体的转换逻辑。下面我们来一个完整的mapfunc数据集转换的代码,这里我们使用的是匿名内部类的方法:
package org.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.example.func.DistributeCacheMapper; public class DataSetMapFuncJob { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> input = env.fromElements(2, 1); input.map(new MapFunction<Integer, String>() { @Override public String map(Integer value) throws Exception { return "userId:"+value; } }).print(); } }
运行看下效果:
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...