在spark sql中,我们知道操作的数据都是DataFrame,因此从这篇文章开始,我们介绍几种创建DataFrame的方式,这篇是第一篇,介绍从元组中创建DataFrame。下面来演示一下:
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object StudentCal {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//这里的master模式,在提交的时候我们可以使用命令进行修改,但是在自动化部署提交的平台里面我们不能随意去添加各种命令,因此在这里设置即可。
conf.set("spark.master", "local[*]")
//这里是配置spark应用程序执行的cpu核数
conf.set("spark.executor.cores", "2")
//这里是配置spark应用程序执行的时候堆大小
conf.set("spark.executor.memory", "4g")
//这里是配置spark应用程序执行的时候需要多少个Executor进程来执行整个任务
conf.set("spark.executor.instances", "6")
//这里是设置spark应用程序数据本地化等待时长,这个意思代表数据本地化等待时长,spark driver对于application分配的task尽量在数据节点上,
//这里补充一下,设置这个时间,也就是等待driver把task分配到数据所处的节点上,如果超过这个时长,则数据会被分配到就近的一个节点上。
conf.set("spark.locality.wait", "0")
//这里是spark的序列化信息设置
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
val session = SparkSession.builder()
//把上面的conf设置进来
.config(conf)
.appName("demo")
.getOrCreate()
var users = Seq(("张三", 15, "男"),
("李四", 16, "女"))
//这里需要在编写业务代码的方法里面引入包,在外层引入包会报错,如果不添加这行的话,就没有toDF方法
import session.implicits._
//把上面的元组信息对应转换创建dataframe
val usersDF = users.toDF("name", "age", "sex")
//打印dataframe的schema信息。
usersDF.printSchema()
//再打印下userDF的整个dataframe表信息
usersDF.show()
}
}备注:
1、在上面的代码案例里面我们编写有相关的注释,看注释能明白里面的内容。
2、在对元组进行转换成dataframe的时候,需要在代码的方法里面引入sesssion的implictis包下面的所有类信息,这里的引入放在最顶上进行引入的时候会报错。
3、在对元组进行转换成dataframe的时候,核心方法是toDF这个方法。









还没有评论,来说两句吧...