现如今接触到数据库的同学大家都应该听说过分区的概念,包括mysql也有分区的概念。所以在Hudi这个数据湖里面他也有分区的概念,我们在前面的文章《数据湖系列(九)使用spark程序向Apache Hudi插入数据》已经演示过插入数据,在当时是没有指定分区的,因此Hudi会默认创建一个分区。本文的话我们来介绍下单分区的案例。
1)首先制作两个users,内容分别是:
{"id":1,"loc":"beijing","sex":1,"name":"张三"} {"id":3,"loc":"shanghai","sex":1,"name":"王五"}
2)定义分区
这里我们定义分区的话需要指定一个字段,由于是单分区,所以我们这里指定loc字段。
3)编写单分区插入数据示例代码
接着我们编写一个对应的单分区插入数据的示例,示例代码如下:
//创建spark json,使用本地模式进行存储 val session: SparkSession = SparkSession.builder() //本地模式运行 .master("local") //job名称 .appName("InsertHudi") //设置spark的序列化 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() //读取json文件,创建DataFrame,这里的users.txt文件存放在src/main/resources里面的 val insertDF1 = session.read.json("file:///D:\\aaaa\\users1.txt") val insertDF2 = session.read.json("file:///D:\\aaaa\\users2.txt") //将结果保存到hudi中 insertDF1.write.format("org.apache.hudi") //或者直接写hudi //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "id") //指定分区列 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") //设置写并行度设置,默认1500 .option("hoodie.insert.shuffle.parallelism", "1") //设置update并行度设置,默认1500 .option("hoodie.upsert.shuffle.parallelism", "1") //设置表名 .option(HoodieWriteConfig.TABLE_NAME, "users") //重写覆盖 .mode(SaveMode.Overwrite) //存储到hdfs上的路径,这里需要把hdfs的core和hdfs-site配置文件放在src/main/resources目录下。 .save("/hudi_data/users") //将结果保存到hudi中 insertDF2.write.format("org.apache.hudi") //或者直接写hudi //设置主键列名称 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //当数据主键相同时,对比的字段,保存该字段大的数据 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "id") //指定分区列 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc") //设置写并行度设置,默认1500 .option("hoodie.insert.shuffle.parallelism", "1") //设置update并行度设置,默认1500 .option("hoodie.upsert.shuffle.parallelism", "1") //设置表名 .option(HoodieWriteConfig.TABLE_NAME, "users") //重写覆盖 .mode(SaveMode.Append) //存储到hdfs上的路径,这里需要把hdfs的core和hdfs-site配置文件放在src/main/resources目录下。 .save("/hudi_data/users")
这里我们插入了两批数据进入hudi,在代码里面我们使用option的方式指定了分区字段,示例如下:
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
如果我们指定了分区字段,那么Hudi就会按照我们指定的字段进行分区保存数据。
4)运行测试
接着我们运行测试一下,看看插入数据的情况。首先查看执行结果:
可以看到执行成功,然后我们去hdfs上看看,如下图:
可以看到我们已经完成了数据的分区了,北京的数据在一个区,上海的数据在一个区,最后我们查询下结果:
数据能被完整的查询出来。
备注:
1、这里指定分区的话,我们只需要通过option设置对应的分区参数即可。如果是单分区,那么就指定一个字段,最后的结果就是一个指定字段值的分区目录,如果是多分区的话,那么就指定多个字段,使用逗号进行分割,然后会生成一个子目录形式的的分区目录,示例如下:
执行完成之后,我们可以在hdfs看到子目录组成的新的分区目录,示例如下:
以上就是使用分区的形式向Hudi插入数据的案例,分别对应的代码文件是:
SimplePartitionInsertHudi.scala MultiPartitionInsertHudi.scala
在查询的时候能看到对应的分区信息,如下图:
最后按照惯例,附上本案例的源码,登陆后即可下载
还没有评论,来说两句吧...