在前面我们介绍了mapreduce是一个分而治之的分布式计算框架,那么我们再编写程序的时候就会涉及到两部分,一部分是map,一部分是Reduce。下面我们简单的写一个用于单词个数统计的map job和 reduce job。
Map job
public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
ReduceJob
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
此时我们对job进行简单的配置即可运行起来,配置如下:
Configuration conf = new Configuration(true); Job job = Job.getInstance(conf); //设置含main方法的类,即本类 job.setJarByClass(MyWC.class); //设置job运行时的名字 job.setJobName("myjob"); //设置map读取文件的位置 Path infile = new Path("/user/root/sxt02.txt"); FileInputFormat.addInputPath(job, infile ); //设置reduce输出的结果文件的位置 Path outfile = new Path("/data/wc/output01"); //输出目录不能存在,存在就删除 if(outfile.getFileSystem(conf).exists(outfile)){ outfile.getFileSystem(conf).delete(outfile,true); } FileOutputFormat.setOutputPath(job, outfile ); //设置job运行的mappper类 job.setMapperClass(MyMapper.class); //设置mapper往reduce输出的序列化和反序列化类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置job运行的reduce类 job.setReducerClass(MyReducer.class); job.waitForCompletion(true);
此时我们编写的mapreduce job代码就编写好了,只需要打包放到服务器上执行即可。执行命令是:
./hadoop jar ${hadoop的mapreduce程序jar包} ${程序主类} ${输入文件} ${输出文件目录}
还没有评论,来说两句吧...