本文我们介绍下使用mapreduce进行排序的实战案例。在实际的场景中,排序是一个硬性的需求,所以我们在mapreduce中需要来实现这个排序功能。
在mapreduce中,排序会发生在map阶段,因此这里的话,我们需要在map阶段进行排序,同时mapreduce是根据输出的key进行排序的,如果输出的key是intwritable的话,则会按照数字进行排序,如果输出的key是其他类型,则会按照ascii码进行排序。下面我们来演示下,这里还是使用之前的wordcount进行排序。下面直接开始
1)编写map进行words的切分
这里我们首先肯定是需要对单词进行切分,切分后的结果是:
this 1 is 1 a 1 test 1
类似上面的示例数据,详细的代码如下:
package com.mr.demo.map;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* 从hdfs上读取文本内容的时候,这里使用mapper需要如下的几个输入值:
*<输入键类型,输入值类型,输出键类型,输出值类型> 这种进行类型的对标,对于<输入键类型,输入值类型>我们可以统一使用<Object, Text>即可
*
* 备注:对于上诉的<Object, Text>,仅限于text文本类型,不适合其他类型
*
*
* @author Administrator
*
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountMapper.class);
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
LOGGER.info("当前解析到的行内容是:{}",value.toString());
//这里的value就是我们接受到的每一行的内容,我们主要针对于每一行的内容进行解析
StringTokenizer words = new StringTokenizer(value.toString());
while(words.hasMoreTokens()) {
word.set(words.nextToken());
context.write(word, one);
}
//上诉我们的输出内容是: this 1,is 1,a 1,word 1 这种一个单词,数量为1
}
@Override
public void run(Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
super.run(context);
}
@Override
protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
}2)准备reduce进行统计
在前面我们通过map把单词切分了,接下来我们就来统计下这里的单词,最后形成的数据样例如下:
this 5 is 2 a 1 test 10
这种,前面是单词,后面是单词的数量,详细的示例代码如下:
package com.mr.demo.reduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountReduce.class);
private IntWritable _sum = new IntWritable();
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 到这里经过之前的map解析之后,然后经过shuffle传递过来的值样例是:this:[1,2,3,4,1],is:[1,2,2,1,2]
// 所以我们只需要把后面数组里面的值相加即可。
int sum = 0;
for (IntWritable val : value) {
sum += val.get();
}
_sum.set(sum);
LOGGER.info("对应的key:{} 总数量是:{}",key,sum);
context.write(key, _sum);
}
@Override
public void run(Reducer<Text, IntWritable, Text, IntWritable>.Context arg0)
throws IOException, InterruptedException {
super.run(arg0);
}
@Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
}3)转换下单词与总数的个数
这里我们再使用一个map,转换下这个单词和数量的个数,原来的示例如下:
this 5
转换后的结果是:
5 this
这样子我们后续就可以根据key进行排序了,示例代码的map如下:
package com.mr.demo.map;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMap extends Mapper<Object, Text, IntWritable, Text> {
private IntWritable _key = new IntWritable();
private Text _value = new Text();
@Override
protected void cleanup(Mapper<Object, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
String[] tokens = line.split("\t");
_key.set(Integer.valueOf(tokens[1]));
_value.set(tokens[0]);
context.write(_key, _value);
}
@Override
public void run(Mapper<Object, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
super.run(context);
}
@Override
protected void setup(Mapper<Object, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
}4)定义排序类型
这里的话我们可以根据key进行升序排序,也可以根据key进行倒序排序,因此这里我们需要定义一个排序类,示例代码如下:
package com.mr.demo.sort;
import org.apache.hadoop.io.IntWritable;
public class WordCountSort extends IntWritable.Comparator{
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//这里直接使用默认方法即可,如果是正数,代表升序,如果是负数,代表降序
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}5)编排job
这里比较特殊,我们可以看到有2个map,1个reduce,然后整个是需要流程进行的,因此这里主类的话我们需要创建两个job,然后挨个执行,示例代码如下:
package com.mr.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.mr.demo.map.SortMap;
import com.mr.demo.map.WordCountMapper;
import com.mr.demo.reduce.WordCountReduce;
import com.mr.demo.sort.WordCountSort;
/**
* 这里我们首先编写一个计数的job
*
* @author Administrator
*
*/
public class WordCountJobs {
public static void main(String[] args) throws Exception {
// 在运行mapreduce的时候,我们一般都是直接执行hadoop
// -jar的时候会把这个动态的参数传递进来,因此我们还是尽量保持一致,但是这里的话,我们主要是在本地运行测试,因此这里做下判断,在本地运行的时候稍微简化一点。
if (null == args || args.length == 0) {
args = new String[] {
// 这里的输入文件可以是一个文件夹,也可以是一个具体的文件路径
"/wordcount",
//临时文件夹
"/wordcounttmp",
// 这里的输出需要指定一个文件夹
"/wordcounttarget" };
}
// 首先都要创建下这里的configuration,这里我们由于是演示的项目,因此没有特别需要配置configuration的地方,如果没有单独配置,那么所有的配置就直接拿取整个hadoop集群的配置。
Configuration configuration = new Configuration();
// 首先我们看看target目录是否存在,存在的话把他给手动删除一下
deleteOutPut(args[1], configuration);
deleteOutPut(args[2], configuration);
// 然后我们在这里编写mapreduce,来计数
Job job = Job.getInstance(configuration, "word count");
//设置运行的类,一般都是本类
job.setJarByClass(WordCountJobs.class);
//设置mapper类
job.setMapperClass(WordCountMapper.class);
//设置reduce类
job.setReducerClass(WordCountReduce.class);
//指定输出的内容key
job.setOutputKeyClass(Text.class);
//指定输出的内容value
job.setOutputValueClass(IntWritable.class);
//添加需要读取的数据源目录
FileInputFormat.addInputPath(job, new Path(args[0]));
//添加需要输出的数据目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int code = job.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
Job jobs = Job.getInstance(configuration, "word count sort");
jobs.setJarByClass(WordCountSort.class);
jobs.setMapperClass(SortMap.class);
jobs.setOutputFormatClass(TextOutputFormat.class);
jobs.setOutputKeyClass(IntWritable.class);
jobs.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(jobs, new Path(args[1]));
FileOutputFormat.setOutputPath(jobs, new Path(args[2]));
jobs.setSortComparatorClass(WordCountSort.class);
code = jobs.waitForCompletion(true) ? 0 : 1;
}
System.exit(code);
}
/**
* 去服务器上删除掉输出目录,避免输出目录已经存在而报错
*
* @param args
* @throws IOException
*/
private static void deleteOutPut(String path, Configuration configuration) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(new Path(path))) {
fileSystem.delete(new Path(path), true);
}
}
}可以看到这里由2个job,每个job都有自己的输出和输出,然后第一个job执行完毕之后再会执行第二个job。
6)把项目打包上传服务器运行下:
hadoop jar MRDemo-1.0-jar-with-dependencies.jar com.mr.demo.WordCountJobs
我们可以看到在yarn上执行了两个任务:
和
最后我们看下结果输出:
可以看到实现了倒序的排序。最后按照惯例,附上本案例的源码,登录后即可下载。












还没有评论,来说两句吧...