在spark sql中,我们知道操作的数据都是DataFrame,因此从这篇文章开始,我们介绍几种创建DataFrame的方式,这篇文章介绍从RDD中创建DataFrame。
从RDD中创建dataframe一共会出现两种案例,但是在rdd中创建DataFrame的核心还是toDF函数。下面分别介绍下:
第一种案例
直接把数据先转换成元组,然后从元组中创建dataframe。下面我们演示下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object StudentCal { def main(args: Array[String]): Unit = { val conf = new SparkConf() //这里的master模式,在提交的时候我们可以使用命令进行修改,但是在自动化部署提交的平台里面我们不能随意去添加各种命令,因此在这里设置即可。 conf.set("spark.master", "local[*]") //这里是配置spark应用程序执行的cpu核数 conf.set("spark.executor.cores", "2") //这里是配置spark应用程序执行的时候堆大小 conf.set("spark.executor.memory", "4g") //这里是配置spark应用程序执行的时候需要多少个Executor进程来执行整个任务 conf.set("spark.executor.instances", "6") //这里是设置spark应用程序数据本地化等待时长,这个意思代表数据本地化等待时长,spark driver对于application分配的task尽量在数据节点上, //这里补充一下,设置这个时间,也就是等待driver把task分配到数据所处的节点上,如果超过这个时长,则数据会被分配到就近的一个节点上。 conf.set("spark.locality.wait", "0") //这里是spark的序列化信息设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val session = SparkSession.builder() //把上面的conf设置进来 .config(conf) .appName("demo") .getOrCreate() val sc = session.sparkContext val users = Seq(("张三", 15, "男"), ("李四", 16, "女")) //先把数据转换成RDD val usersRDD = sc.parallelize(users,1) //这里需要在编写业务代码的方法里面引入包,在外层引入包会报错,如果不添加这行的话,就没有toDF方法 import session.implicits._ //从RDD中转换创建dataframe val usersDF = usersRDD.toDF("name", "age", "sex") //打印dataframe的schema信息。 usersDF.printSchema() //再打印下userDF的整个dataframe表信息 usersDF.show() } }
第二种案例
在编写spark程序的时候,我们有时候会直接把数据转换成对应的POJO,此时每一个pojo对象都包含有字段和类型。那么这个时候的rdd对象就是pojo数据集。此时我们把这种有明确定义字段属性和值的rdd转换成dataframe。下面来演示一下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object StudentCal { def main(args: Array[String]): Unit = { val conf = new SparkConf() //这里的master模式,在提交的时候我们可以使用命令进行修改,但是在自动化部署提交的平台里面我们不能随意去添加各种命令,因此在这里设置即可。 conf.set("spark.master", "local[*]") //这里是配置spark应用程序执行的cpu核数 conf.set("spark.executor.cores", "2") //这里是配置spark应用程序执行的时候堆大小 conf.set("spark.executor.memory", "4g") //这里是配置spark应用程序执行的时候需要多少个Executor进程来执行整个任务 conf.set("spark.executor.instances", "6") //这里是设置spark应用程序数据本地化等待时长,这个意思代表数据本地化等待时长,spark driver对于application分配的task尽量在数据节点上, //这里补充一下,设置这个时间,也就是等待driver把task分配到数据所处的节点上,如果超过这个时长,则数据会被分配到就近的一个节点上。 conf.set("spark.locality.wait", "0") //这里是spark的序列化信息设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val session = SparkSession.builder() //把上面的conf设置进来 .config(conf) .appName("demo") .getOrCreate() val sc = session.sparkContext val users = List(User("张三", 15, "男"), User("李四", 16, "女")) //先把数据转换成RDD val usersRDD = sc.parallelize(users,1) //这里需要在编写业务代码的方法里面引入包,在外层引入包会报错,如果不添加这行的话,就没有toDF方法 import session.implicits._ //从RDD中转换创建dataframe val usersDF = usersRDD.toDF() //打印dataframe的schema信息。 usersDF.printSchema() //再打印下userDF的整个dataframe表信息 usersDF.show() } case class User(name:String,age:Long,sex:String){ } }
备注:
1、相比于第一种方式,我们再这里传递进来的数据是每一个pojo对象,因此需要在里面声明下case class
2、在第二种里面,我们可以看到toDF方法里面不需要显示的指明schema。
还没有评论,来说两句吧...