在前面我们介绍了,在进行接口编程的时候我们主要使用的就是RDD,因此RDD可以看作是spark接口编程的基石。这篇文章我们介绍下创建RDD的几种方式。
一、从集合中创建RDD
1.1)从list中创建RDD
package org.example import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext var list = List(1,2,3,4,5,6,7) //从list创建名为data的rdd val data = sc.parallelize(list) data.collect().foreach(println) } }
1.2)从数组中创建RDD
package org.example import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext var list = Array(1,2,3,4,5,6,7) //从list创建名为data的rdd val data = sc.parallelize(list) data.collect().foreach(println) } }
二、从存储系统中创建RDD
这里的存储系统中创建RDD也就是给一个文件的路径,spark会通过这个文件的路径读取文件的内容,然后创建rdd。这个文件路径可以使本地的路径,也可以是hdfs的路径,还可以是Cassandra和Amazon S3等等文件系统的数据,常用的我们主要是hdfs的路径。示例如下:
package org.example import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext //从list创建名为data的rdd val data = sc.textFile("hdfs://master:59000/source/users.txt") data.collect().foreach(println) } }
三、从旧的RDD转换成新的RDD
在之前提到过每一个RDD是只读的,不能被修改,如果想要修改的话,则需要通过转换操作重新生成一个新的RDD对象,下面我们就演示一下。
package org.example import com.alibaba.fastjson.JSON import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext //从文件中读取每一行json数据 val data = sc.textFile("C:\\Users\\Administrator\\Desktop\\fsdownload\\users.txt") //把文件中的每一行json数据转换成新的rdd对象 val newData = data.map( line => JSON.parseObject(line)) newData.collect().foreach(println) } }
以上就是我们介绍的几种创建RDD的主要方式。
备注:
1、在使用集合进行RDD创建的时候,系统会默认为我们自动设置分区的值,我们也可以自己设置对应的分区值,例如:
var list = List(1,2,3,4,5) val data = sc.parallelize(list,2)
在第二行的第二个参数2代表的就是分两个分区。
还没有评论,来说两句吧...