在实际的spark应用程序开发中,我们对于dataframe进行各种操作之后会得到本业务的最终数据,此时那么我们肯定是需要把数据存储起来,那么DataFrame如何做存储呢?
第一种方式:存储到文件
在前面的案例里面,我们有演示过dataframe数据存储,当时也是存储文件,其实存储到文件时最简单的方式,总体使用存储的公式如下:
df.write.format(...).mode(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save(path)
在dataframe中我们存储文件可以使json格式,csv格式,orc格式和parquet文件格式,默认的是parquet文件格式,在编写代码进行数据存储的时候,我们直接套上面的公式即可。具体代码示例如下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.functions.lit object StudentCal { def main(args: Array[String]): Unit = { val conf = new SparkConf() //这里的master模式,在提交的时候我们可以使用命令进行修改,但是在自动化部署提交的平台里面我们不能随意去添加各种命令,因此在这里设置即可。 conf.set("spark.master", "local[*]") //这里是配置spark应用程序执行的cpu核数 conf.set("spark.executor.cores", "2") //这里是配置spark应用程序执行的时候堆大小 conf.set("spark.executor.memory", "4g") //这里是配置spark应用程序执行的时候需要多少个Executor进程来执行整个任务 conf.set("spark.executor.instances", "6") //这里是设置spark应用程序数据本地化等待时长,这个意思代表数据本地化等待时长,spark driver对于application分配的task尽量在数据节点上, //这里补充一下,设置这个时间,也就是等待driver把task分配到数据所处的节点上,如果超过这个时长,则数据会被分配到就近的一个节点上。 conf.set("spark.locality.wait", "0") //这里是spark的序列化信息设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val session = SparkSession.builder() //把上面的conf设置进来 .config(conf) .appName("demo") .getOrCreate() val sc = session.sparkContext val users = List(User("张三", 15, "男"), User("李四", 16, "女")) import session.implicits._ //先把数据转换成RDD val usersDF = sc.parallelize(users,1).toDF() usersDF.select().orderBy("age").write.format("json").mode(SaveMode.Overwrite).partitionBy("name") .save("hdfs://master:9000/user") } case class User(name:String,age:Long,sex:String){ } }
第二种方式:存储到数据库
在spark中,还提供一种类型,即直接把数据存储到mysql的数据库即可。这里主要用到的函数是jdbc,具体代码示例如下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.functions.lit import java.util.Properties object StudentCal { def main(args: Array[String]): Unit = { val conf = new SparkConf() //这里的master模式,在提交的时候我们可以使用命令进行修改,但是在自动化部署提交的平台里面我们不能随意去添加各种命令,因此在这里设置即可。 conf.set("spark.master", "local[*]") //这里是配置spark应用程序执行的cpu核数 conf.set("spark.executor.cores", "2") //这里是配置spark应用程序执行的时候堆大小 conf.set("spark.executor.memory", "4g") //这里是配置spark应用程序执行的时候需要多少个Executor进程来执行整个任务 conf.set("spark.executor.instances", "6") //这里是设置spark应用程序数据本地化等待时长,这个意思代表数据本地化等待时长,spark driver对于application分配的task尽量在数据节点上, //这里补充一下,设置这个时间,也就是等待driver把task分配到数据所处的节点上,如果超过这个时长,则数据会被分配到就近的一个节点上。 conf.set("spark.locality.wait", "0") //这里是spark的序列化信息设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val session = SparkSession.builder() //把上面的conf设置进来 .config(conf) .appName("demo") .getOrCreate() val sc = session.sparkContext val users = List(User("张三", 15, "男"), User("李四", 16, "女")) import session.implicits._ //先把数据转换成RDD val usersDF = sc.parallelize(users,1).toDF() usersDF.select().orderBy("age").show() // 下面创建一个 prop 变量用来保存 JDBC 连接参数 val prop = new Properties() prop.put("user", "test_user") // 表示用户名是 test_user prop.put("password", "123456") // 表示密码是 123456 prop.put("driver","com.mysql.jdbc.Driver") // 表示驱动程序是 //下面就可以连接数据库,采用 append 模式,表示追加记录到数据库 test_user 的 users 表中 val DB_URL= "jdbc:mysql://192.168.31.30:3306/test_user?useSSL=false" // 数据库名为 test_user usersDF.write .mode(SaveMode.Append) // .mode("append") .jdbc(DB_URL, "users", prop) } case class User(name:String,age:Long,sex:String){ } }
备注:
1、这里由于使用到了mysql驱动,因此在这里如果是maven项目的话需要添加如下依赖:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>
如果是在spark-shell模式下,则需要把相关的jar包放到libs目录下。
备注:
1、以上就是我们经常使用到的dataframe主要的存储方式。
2、上面使用到了存储模式,在spark中使用dataframe数据存储,可以根据我们的需求来使用对应的存储模式即可,所有的存储模式如下:
序号 | 模式 | 说明 |
1 | append | 将DataFrame数据追加到已经存在于指定目标位置下的文件列表 |
2 | overwrite | 使用DataFrame数据完全覆盖已经存在于指定目标位置下的任何数据文件 |
3 | error errorIfExists | 这是默认模式 如果指定的目标位置存在,那么DataFrameWriter将抛出一个错误 |
4 | ignore | 如果指定的目标位置存在,则简单地什么都不做。换句话说,不写出DataFrame中的数据 |
3、在dataframe进行transformation的时候,对应的分区数会变大,默认是200,如果我们希望合并的话,可以提前对分区数进行合并再进行填写,例如:
usersDF.select().orderBy("age").coalesce(1).write.mode(SaveMode.Overwrite).save("hdfs://master:9000/user")
核心是利用coalesce函数提前把分区数变成1,再进行写文件。
还没有评论,来说两句吧...