在上一篇文章《Mapreduce实战案例(二)实现map端join》我们实现了在map端进行join,这篇文章的话,我们演示下实现在reduce端做join。
在reduce端进行join的时候,我们主要实现的逻辑如下:
map端逻辑:
1、在map端读取所有的文件,然后把内容转换成实体 2、根据转换成的实体,找到对应的join键。 3、把join键的值做成output的key,把实体转换成join的值发送给reduce
reduce端的逻辑:
1、会接受到相同join键对应的key组成的values集合 2、遍历values集合,获取里面的值 3、在最外面证明一个通用的model 4、把遍历到的值转换成通用的model 5、写出结果(通用的module)
按照上面的逻辑我们来挨个实现一下。这里我们还是使用上一篇文章的示例数据:school_info和students_info,如下图:
首先我们创建一个ReduceJoinMapper的类,用来读取文件转换实体,示例代码如下:
package com.mr.demo.map;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.alibaba.fastjson.JSON;
import com.mr.demo.model.SchoolPoJo;
import com.mr.demo.model.StudentPoJo;
/**
* 这里是主要是对接在reduce端进行join,因此mapper端主要做的内容就是把文本读取出来,然后做成对应的model,然后把他放进去
* @author Administrator
*
*/
public class ReduceJoinMapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void cleanup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//首先我们回去拿到的文件名,根据文件名进行对应的文件实体解析
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
if(fileName.contains("school")) {
//如果文件名包含school字样,则按照school的实体进行解析
String line = value.toString();
SchoolPoJo poJo = JSON.parseObject(line, SchoolPoJo.class);
//这里我们要根据school_id进行join,因此这里我们把school_id做成键发出去
context.write(new Text(poJo.getSchool_id()+""),new Text( JSON.toJSONString(poJo)));
}else {
//则按照学生的实体进行解析
String line = value.toString();
StudentPoJo poJo = JSON.parseObject(line, StudentPoJo.class);
//这里我们要根据school_id进行join,因此这里我们把school_id做成键发出去
context.write(new Text(poJo.getSchool_id()+""),new Text( JSON.toJSONString(poJo)));
}
}
@Override
public void run(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
super.run(context);
}
@Override
protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
super.setup(context);
}
}然后我们创建一个ReduceJoinReducer的类,主要做遍历聚合使用,示例代码如下:
package com.mr.demo.reduce;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.alibaba.fastjson.JSON;
import com.mr.demo.model.StudentPoJo;
public class ReduceJoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//上一个,我们把schoolid做成了key发送过来,因此这里的相同key的value会是一个集合被保存过来,因此这里我们直接遍历values做join操作即可。
StudentPoJo pojo = new StudentPoJo();
for(Text value : values) {
String line = value.toString();
StudentPoJo p = JSON.parseObject(line, StudentPoJo.class);
if(StringUtils.isBlank(p.getSchool_name())) {
//解析到了学生
pojo.setStudent_id(p.getStudent_id());
pojo.setStudent_name(p.getStudent_name());
pojo.setSchool_id(p.getSchool_id());
}else {
//解析到了学校
pojo.setSchool_name(p.getSchool_name());
pojo.setSchool_address(p.getSchool_address());
}
}
//上面我们通过遍历的方式把实现了数据的join,最后我们把结果写入到文件中去。
context.write(new Text(JSON.toJSONString(pojo)), new Text());
}
@Override
public void run(Reducer<Text, Text, Text, Text>.Context arg0) throws IOException, InterruptedException {
super.run(arg0);
}
@Override
protected void setup(Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
}最后我们整合下job信息,直接设置即可,示例代码如下:
package com.mr.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.mr.demo.map.ReduceJoinMapper;
import com.mr.demo.reduce.ReduceJoinReducer;
/**
* 这里我们测试下reduce join
* @author Administrator
*
*/
public class ReduceJoinJobs {
public static void main(String[] args) throws Exception {
// 在运行mapreduce的时候,我们一般都是直接执行hadoop
// -jar的时候会把这个动态的参数传递进来,因此我们还是尽量保持一致,但是这里的话,我们主要是在本地运行测试,因此这里做下判断,在本地运行的时候稍微简化一点。
if (null == args || args.length == 0) {
args = new String[] {
// 这里做join操作,因此我们传入一个文件夹即可,然后在代码里面根据文件名来区别实体解析
"/school",
// 这里的输出需要指定一个文件夹
"/schooltarget"
};
}
// 首先都要创建下这里的configuration,这里我们由于是演示的项目,因此没有特别需要配置configuration的地方,如果没有单独配置,那么所有的配置就直接拿取整个hadoop集群的配置。
Configuration configuration = new Configuration();
// 首先我们看看target目录是否存在,存在的话把他给手动删除一下
deleteOutPut(args[1], configuration);
// 然后我们在这里编写mapreduce,来计数
Job job = Job.getInstance(configuration, "ReduceJoin task");
//设置运行的类,一般都是本类
job.setJarByClass(MapJoinJobs.class);
//设置mapper类
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
//指定输出的内容key
job.setOutputKeyClass(Text.class);
//指定输出的内容value
job.setOutputValueClass(Text.class);
//添加需要读取的数据源目录
FileInputFormat.addInputPath(job, new Path(args[0]));
//添加需要输出的数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* 去服务器上删除掉输出目录,避免输出目录已经存在而报错
*
* @param args
* @throws IOException
*/
private static void deleteOutPut(String path, Configuration configuration) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(new Path(path))) {
fileSystem.delete(new Path(path), true);
}
}
}然后我们把项目打包下上传上去,运行下demo
可以看到运行成功了,我们看看结果文件:
没有任何问题。以上就是我们在reduce端实现join的详细案例,最后按照惯例,附上本案例的源码,登录后即可下载:












还没有评论,来说两句吧...