在前面我们大部分篇幅都介绍的是dataframe,这篇文章我们介绍下dataset。dataset是spark1.6开始提供的函数编程API,在spark2.x之后,spark把dataset和dataframe给融合了,因此在spark2.x的版本之后,我们可以按照dataframe的操作习惯操作数据(使用sql操作的方式),也可以使用dataset的操作习惯操作数据(使用rdd对象操作的方式)。两种形式都可以应用在spark sql中,主要还是看个人习惯。
创建dataset的方式
第一种方式:使用 DataFrame 类的 as(符号)函数将 DataFrame 转换为 Dataset,示例如下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.functions.lit import java.util.Properties object Demo { case class UserPoJo(name: String, age: Long, sex: String) def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).appName("demo").master("local[*]").getOrCreate(); var user = Seq(("张三", 15, "男"),("李四", 16, "女")) import session.implicits._ val users = user.toDF("name","age","sex").as[UserPoJo] users.printSchema() } }
备注:
1、这种方式主要是直接从现有的dataframe上面把数据转换成dataset
第二种方式:使用 SparkSession.createDataset()函数从本地集合对象中创建 Dataset,示例代码如下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.functions.lit import java.util.Properties object Demo { case class UserPoJo(name: String, age: Long, sex: String) def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).appName("demo").master("local[*]").getOrCreate(); var user = Seq(UserPoJo("张三", 15, "男"),UserPoJo("李四", 16, "女")) import session.implicits._ val users = session.createDataset(user) users.printSchema() } }
备注:
1、这里需要集合已经指明对应的类和字段信息
第三种方式:使用 toDS 隐式转换程序,示例代码如下:
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.functions.lit import java.util.Properties object Demo { case class UserPoJo(name: String, age: Long, sex: String) def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).appName("demo").master("local[*]").getOrCreate(); var user = Seq(UserPoJo("张三", 15, "男"),UserPoJo("李四", 16, "女")) import session.implicits._ val users = user.toDS() users.printSchema() } }
备注:
1、针对dataset的3种转换方式,第一种从dataframe转dataset是比较常用的,因为我们很可能会同时兼顾dataframe和dataset的时候,dataframe可以让我们很方便的进行sql操作,所以使用频率高,因此dataframe转dataset的方式也是最常用的。
2、dataset我们在前面已经介绍过了,这是类似于RDD的操作,因此使用dataset的话,我们需要一个case类,这个是必须要使用到的。
操作Dataset
前面我们已经把dataset创建了,那么接下来就是使用问题,在dataset上我们提过就类似于操作RDD一样,因此这里操作Dataset的transformation和action 函数api有如下:
transformation 函数API:
序号 | 函数 | 说明 |
1 | map | 返回将输入函数应用到每一个元素之后的新DataSet |
2 | filter | 返回一个新的Daaset,他包含输入函数为true的元素 |
3 | groupByKey | 返回一个KeyValueGroupedDataset,其数据按给定的key函数分组 |
action 函数API
序号 | 函数 | 说明 |
1 | show(n) | 显示Dataset中前n行数据 |
2 | take(n) | 以数组形式返回Dataset中前n个对象 |
3 | count | 返回Dataset中行数 |
备注:
1、使用dataset的方式,由于有class的类限制,因此在编程的时候,在编写代码的时候就能发现代码的错误,可以很方便的及时修改,使用dataframe的话,常常在编译后或者运行的时候才能发现里面的错误。
2、所谓萝卜青菜,各有所爱,dataframe和dataset在spark中都可以使用,根据自己的习惯来使用即可。
还没有评论,来说两句吧...