在常见的数据分析中,我们经常会使用到join相关的数据关联性操作,所以对于传统型的mysql来说,我们只需要编写相关的join的sql语句即可,例如下面我们列举一个学校和学生的数据关系的场景:
学校的数据如下:
{"school_id":1,"school_name":"北京一中","school_address":"北京一中地址"} {"school_id":2,"school_name":"北京二中","school_address":"北京二中地址"} {"school_id":3,"school_name":"北京三中","school_address":"北京三中地址"}
学生的数据如下:
{"student_id":1,"student_name":"张三","school_id":"1"} {"student_id":2,"student_name":"李四","school_id":"2"} {"student_id":3,"student_name":"王五","school_id":"3"} {"student_id":4,"student_name":"赵六","school_id":"4"} {"student_id":5,"student_name":"田七","school_id":"2"} {"student_id":6,"student_name":"王八","school_id":"1"}
像上面的数据,如果是使用mysql的话,我们只需要执行下面的sql语句就可以了:
select * from students_info join school_info on students_info.school_id = school_info.school_id
这样子我们就可以得到想要的结果了。但是在hadoop里面,我们使用mapreduce怎么来实现呢?其实在mapreduce中,我们也可以实现这里的mapreduce的效果,但是实现的时候有两种方式,第一种方式是在map端进行join,第二种方式是在reduce端进行join。这篇文字我们首先演示下再map端实现join。下面直接开始演示:
一、添加maven依赖
这里的话,我们还是在这个MRDemo这个项目里面进行编写代码,但是由于这里的输入文件是json的格式,因此我们需要引入外部依赖,最终我们的pom依赖文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mr.demo</groupId> <artifactId>MRDemo</artifactId> <version>1.0</version> <packaging>jar</packaging> <name>MRDemo</name> <url>http://example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.5</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.3.5</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.12.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
这里的话,由于在hadoop环境里面没有fastjson这个框架,因此我们在程序打包的时候,需要把这里的fastjson给打入到依赖包里面,因此我们修改了这里的plugin,使用了assembly命令。同时我们不需要打包hadoop相关的依赖,因此添加了provider描述。
二、创建两个数据源文件
这里的话我们把school_info和students_info做成两个文本文件,然后上传到hdfs上,如下图:
三、我们创建一个MapJoinMapper实现map端join
这里的话我们还是一样的,既然是mapreduce,那么我们首先创建一个map,所以这里我们创建的类是:MapJoinMapper,这个类的主要作用有:
1、在初始化阶段读取小表(也就是读取school_info表) 2、在map阶段读取student表,然后使用if-else进行判断join
这里我们的MapJoinMapper代码示例如下:
package com.mr.demo.map; import java.io.IOException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.alibaba.fastjson.JSON; import com.mr.demo.model.SchoolPoJo; import com.mr.demo.model.StudentPoJo; /** * 这里的话,相当于我们需要在map端进行join,这种情况比较适合一些关联表中有小表的情况 * map任务会在本地对自己读取到的达标数据进行join并输出最终的结果,可以很大程度的提交join的并发度,加快处理速度 * * @author Administrator * */ public class MapJoinMapper extends Mapper<Object, Text, Text, NullWritable> { private HashMap<Integer, SchoolPoJo> tmpMap = new HashMap<Integer, SchoolPoJo>(); @Override protected void cleanup(Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String stu = value.toString(); StudentPoJo student = JSON.parseObject(stu,StudentPoJo.class); SchoolPoJo poJo = tmpMap.get(student.getSchool_id()); if(null != poJo) { student.setSchool_name(poJo.getSchool_name()); student.setSchool_address(poJo.getSchool_address()); } context.write(new Text(JSON.toJSONString(student)), NullWritable.get()); } @Override public void run(Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { super.run(context); } @Override protected void setup(Mapper<Object, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 首先我们获取job的configuration Configuration configuration = context.getConfiguration(); // 然后我们从configuration里面获取设置的tempfile String tempFile = configuration.get("tmpfile"); // 然后我们读取tempfile FileSystem fileSystem = FileSystem.get(configuration); FSDataInputStream fileInputStream = fileSystem.open(new Path(tempFile)); List<String> lines = IOUtils.readLines(fileInputStream, Charset.forName("UTF-8")); //然后我们使用fastjson来转移获取到的json字符串 for (String line : lines) { SchoolPoJo poJo = JSON.parseObject(line, SchoolPoJo.class); tmpMap.put(poJo.getSchool_id(), poJo); } fileInputStream.close(); } }
备注:
1、这里我们使用map端join,不做其他的任务,因此我们不需要再编写reduce了,直接使用map进行输入,然后map进行输出。
四、编写job
还是一样的,我们需要编写一个job来编排整个mapreduce任务,完整的代码示例如下:
package com.mr.demo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.mr.demo.map.MapJoinMapper; /** * * @author Administrator * */ public class MapJoinJobs { public static void main(String[] args) throws Exception { // 在运行mapreduce的时候,我们一般都是直接执行hadoop // -jar的时候会把这个动态的参数传递进来,因此我们还是尽量保持一致,但是这里的话,我们主要是在本地运行测试,因此这里做下判断,在本地运行的时候稍微简化一点。 if (null == args || args.length == 0) { args = new String[] { // 这里的输入文件可以是一个文件夹,也可以是一个具体的文件路径 "/school/students_info", // 这里的输出需要指定一个文件夹 "/schooltarget", //这里放置临时文件 "/school/school_info" }; } // 首先都要创建下这里的configuration,这里我们由于是演示的项目,因此没有特别需要配置configuration的地方,如果没有单独配置,那么所有的配置就直接拿取整个hadoop集群的配置。 Configuration configuration = new Configuration(); //这里设置临时的文件,让他分发到各个节点去 configuration.set("tmpfile", args[2]); // 首先我们看看target目录是否存在,存在的话把他给手动删除一下 deleteOutPut(args[1], configuration); // 然后我们在这里编写mapreduce,来计数 Job job = Job.getInstance(configuration, "MapJoin task"); //设置运行的类,一般都是本类 job.setJarByClass(MapJoinJobs.class); //设置mapper类 job.setMapperClass(MapJoinMapper.class); //这里我们不需要主要演示的事在map端进行join,在reduce中没有其他的操作,因此这里我们不需要reduce //指定输出的内容key job.setOutputKeyClass(Text.class); //指定输出的内容value job.setOutputValueClass(NullWritable.class); //添加需要读取的数据源目录 FileInputFormat.addInputPath(job, new Path(args[0])); //添加需要输出的数据目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } /** * 去服务器上删除掉输出目录,避免输出目录已经存在而报错 * * @param args * @throws IOException */ private static void deleteOutPut(String path, Configuration configuration) throws IOException { FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(new Path(path))) { fileSystem.delete(new Path(path), true); } } }
五、打包
这里我们把整个maven项目进行打包,使用的命令如下:
mvn assembly:assembly
打完包之后,这里就会多出一个带有-with-dependencies后缀的jar包,这里我们可以看到他的大小比较大了,因为包含了fastjson的包,如下图:
然后我们把这个带有dependences后缀的包上传到服务器上。
六、执行测试
当我们把jar包上传到服务器之后,就可以执行如下的命令:
hadoop jar MRDemo-1.0-jar-with-dependencies.jar com.mr.demo.MapJoinJobs
然后等待他执行完毕
然后我们去hdfs上查找对应的结果:
可以看到执行完全没有问题,同时得到了想要的join结果。然后我们再看看yarn的结果:
最后按照惯例,附上本案例的源码,登录后即可下载
还没有评论,来说两句吧...