从spark2.0开始,spark的应用程序入口变成了SparkSession,在Spark2.0之前,spark的应用程序入口是SparkContext。目前spark已经是3.x了,因此我们不再介绍SparkContext了,主要介绍SparkSession。
SparkSession主要是Spark应用程序与RDD、DataFrame和DataSet相互使用的入口点。他的源码类是:
org.apache.spark.sql.SparkSession
这里的SparkSession所处的包是在sparksql包里面,maven的引入如下:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency>
所以我们在使用接口变成的时候,在spark2.x的时候,尽量使用SparkSession,示例如下:
package org.example import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext //读取本地的文件 var textFile = sc.textFile("hdfs://43.153.211.182:59000/sources/aaa.txt") var counts = textFile.flatMap(line => line.split(" ")) //这里主要是把英文单词按照空格进行切分,属于一对多的action,即一条英文可以分成多个单词 .map(word => (word, 1)) //这里是把每一个单词都计数为1 .reduceByKey(_ + _) //最后根据单词进行聚合,然后把单词的计数进行相加。 //打印出最后的统计结果。 counts.collect().foreach(println) counts.saveAsTextFile("hdfs://43.153.211.182:59000/sparkwordcount") } }
上面的demo就是改编自之前的wordcount程序。
备注:
1、虽然在spark2.0之后出现了SparkSession,但是我们在2.0之后的版本里面还是可以使用SparkContext,因为SparkContext里面还是有很多特性在实际接口编程中使用到。
2、这里提到的SparkSession取代SparkContext主要是作为入口,相当于以前使用SparkContext里面配置的SparkConfig等信息,现在使用SparkSession作为入口,因此SparkConfig等信息也可以在SparkSession里面使用。
还没有评论,来说两句吧...