在前面我们介绍RDD的时候大多都是一些单数值的RDD,在这里我们再介绍下Key-Value pair类型的RDD,举个例子:
package org.example import com.alibaba.fastjson.JSON import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main(args: Array[String]): Unit = { val session = SparkSession.builder().config(new SparkConf()).master("local[*]").appName("SparkWordCount").getOrCreate(); val sc = session.sparkContext val sourceData = sc.textFile("C:\\Users\\Administrator\\Desktop\\fsdownload\\student.txt") val newDataRdd = sourceData.flatMap(line => line.split(",")).map(word => (word,1) ) } }
在这个示例里面,这里的newDataRdd就是一种key-value pair的RDD。这一类的RDD包含有key/value,非单值的RDD,因此在spark中,基于这种RDD,spark提供了一些特殊的tansformation api。操作这类RDD的tansformation api函数都是以 xxxByKey这种方式存在的,下面详细列举下:
序号 | 函数 | 说明 |
1 | keys | 返回所有的key |
2 | values | 返回所有的value |
3 | mapValues(func) | 将函数应用到pair RDD中的每个元素上,只改变value,不改变key |
4 | flatMapValues(func) | 传入(K,U)对,传出(K,TraversableOnce[U])通过一个flatMap函数传递key-value pair RDD中的每个value,而不改变key值,这也保留了原始的RDD分区 |
5 | sortByKey([ascending],[numPartitions]) | 这是一个transformation操作。按照key进行排序,默认是升序。当对(k,v)对的数据集(其中K实现Ordered)调用时,返回一个(k,v)对的数据集(按键升序或者降序排序)按布尔类型的ascending参数中指定的顺序。 |
6 | groupByKey([numPartitions]) | 这是一个tansformation操作。他将RDD中每个key的值分组成一个序列。当对(K,V)对的数据集调用时,返回(K,Iterable<V>)对的数据集。 |
7 | reduceByKey(func,[numPartitions]) | 这是一个transformation操作,他按照key来合并值(相同key的值进行合并)。当对一个(K,V)对的数据集调用时,返回一个(K,V)对的数据集,其中每个key的值使用戈丁的reduce函数func进行聚合,该reduce函数的类型必须是()V,V)=> V |
8 | foldByKey(zeroValue, [numPartitions])(func),foldByKey(zeroValue, [partitioner])(func) | 这是一个tansformation操作。他使用一个关联函数和初始值来合并每个键的值,这个初始值可以任意次数地添加到结果中,并且不能改变结果 |
9 | aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | 这是一个 transformation 操作。当对一个(K, V)对的数据集调用时,返回一个(K, U)对的数据集,其 中每个 key 的值使用给定的 combine 函数和一个中性的“零”值进行聚合。允许与输入值类型不同的聚 合值类型,同时避免不必要的分配。 |
10 | combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, mapSideCombine) | 这是一个 transformation 操作。合并相同 key 的值,使用不同的结果类型,参数含义是: 1、createCombiner:在第一次遇到 Key 时创建组合器函数,将 RDD 数据集中的 V 类型 value 值转换C 类型值(V => C) 2、mergeValue:合并值函数,再次遇到相同的 Key 时,将 createCombiner 的 C 类型值与这次传入的 V 类型值合并成一个 C 类型值(C,V)=>C 3、mergeCombiners:合并组合器函数,将 C 类型值两两合并成一个 C 类型值 4、partitioner:使用已有的或自定义的分区函数,默认是 HashPartitioner 5、mapSideCombine:是否在 map 端进行 Combine 操作,默认为 true |
11 | subtractByKey | 这是一个 transformation 操作。它返回这样一个 RDD:其中的 pair 对的键 key 只在当前 RDD 中有 而在 other RDD 中没有。它有三个重载的方法: subtractByKey[W](other) subtractByKey[W](other, numPartitions) subtractByKey[W](other, partitioner) |
12 | sampleByKey(withReplacement, fractions, seed) | 这是一个 transformation 操作。返回按 key 采样的 RDD 的一个子集(通过分层采样)。 |
13 | sampleByKeyExact(withReplacement, fractions, seed) | 这是一个 transformation 操作。返回按 key 采样的 RDD 的一个子集(通过分层采样),对于每一层(具 有相同 key 的一组对),包含精确的 math.ceil(numItems * samplingRate)个元素。 |
14 | 连接操作 | 1、join(otherDataset, [numPartitions]):当对类型(K, V)和(K, W)的数据集调用时,返回(K, (V, W))对的数据集,其中包含每个 key 的所有元素对。通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin支持外连接。 2、leftOuterJoin:左外连接。 3、rightOuterJoin:右外连接。 4、fullOuterJoin:全外连接。 |
15 | cogroup(otherDataset, [numPartitions]) | 这是一个 transformation 操作。当对类型(K, V)和(K, W)的数据集调用时,返回一个(K,(Iterable<V>, Iterable<W>))元组的数据集。这个操作也称为 groupWith。 |
16 | groupWith[W](other):cogroup 的别名 | 这是一个 transformation 操作。groupWith[W1, W2](other1, other2):cogroup 的别名。当对类型(K, V)、 (K, W1)和(K, W2)的数据集调用时,返回一个(K, (Iterable<V>, Iterable<W1>, Iterable<W2>))元组 的数据集。 groupWith[W1, W2, W3](other1, other2, other3):cogroup 的别名。当对类型(K, V)、(K, W1)、(K, W2) 和(K, W3)的数据集调用时,返回一个(K, (Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>)) 元组的数据集。 |
17 | partitionBy(partitioner) | 这是一个 transformation 操作。返回使用指定分区器分区的 RDD 的一个副本 |
18 | repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区程序对 RDD 进行重新分区,并在每个结果分区中根据键对记录进行排序。这比调 用 repartition 然后在每个分区内排序更有效,因为它可以将排序下推到 shuffle 机制中。 |
当然,有transformation操作,那么就会有对应的action操作,下面列举下action相关的函数:
序号 | 函数 | 说明 |
1 | countByKey() | 这是一个 action 操作。计算每个 key 的元素数量,将结果收集到一个本地 Map 中(Map[K, Long]) |
2 | collectAsMap() | 这是一个 action 操作。将这个 RDD 中的键值对作为 Map 返回给 master。这不会返回一个 multimap(所 以如果一个键有多个值,每个键在返回的 map 中只保留一个值)。这个方法只应该在结果数据很小的情 况下使用,因为所有的数据都被加载到驱动程序的内存中。 |
还没有评论,来说两句吧...