RDD(Resilient Distributed Dataset)译作弹性分布式数据集,是Spark中最常用的数据抽象,是一个只可读、可分区、可并行计算的数据集合。RDD允许将工作集缓存在内存中进行复用,大大地提升了查询速度。
RDD简介
- MapReduce 在面对日益复杂的业务逻辑时已经表现出严重的不足:1)维护成本高昂,每一次数据处理都需要编写复杂的Map和Reduce步骤,中间某一步骤出错就要重试以处理异常;2)难以上手,造成处理性能低;
- 因此人们想出用有向无环图(DAG)来抽象表达复杂的数据处理逻辑,各个数据处理步骤表示成图中的节点与边依赖关系,形成数据流的抽象表示,而把复杂的性能优化提交给后台自动处理;
- RDD也即分布式对象集合,是一个只读的分区记录集合,每个RDD可以划分成多个分区,每个分区就是数据集的一部分,同时不同分区可以存储在集群中不同的节点上,从而利用集群节点优势进行并行计算;
- RDD提供了丰富的操作以支持常见的数据处理,即“转换”(Transformation)和“行动”(Action)两种类型操作
- 转换操作指定RDD的依赖关系,通过接受RDD并返回RDD;
- 行动操作执行计算并指定输出的形式,通过接受RDD返回输出值或结果(非RDD);
- 通过Spark的API可以使用不同的语言调用RDD的操作,常见过程流程如下:
- 从各种数据源创建RDD;
- 对RDD指定一系列的转换操作;
- 最后调用行动操作,输出结果或写入外部数据源;
- RDD操作的惰性机制,是指在RDD执行操作时,只有触发行动操作才会做真正的计算,而在行动前的所有转换操作都只是记录下相互的依赖关系,形成数据流的管道化(pipeline),而不会做真正的计算;
RDD的创建
通过数据集合转化为RDD
sc = SparkContext("local", "create_rdd") |
从HDFS数据源或本地文件创建
从一篇CNN新闻报道 Uber and Lyft may look the same, but their visions are not 中抽取新闻主体并放到文件news_sep.txt
中,一段话为一行。
At first blush, it can be hard to tell Uber and ... |
通过指定文件路径读取为RDD:
sc = SparkContext("local", "create_rdd") |
从其他数据库读取数据创建
(待更新)
使用数据流创建
结合流数据处理技术,如Spark Streaming、Kafka以及flume等,通过接收实时的输入数据流创建RDD。
一般RDD的转换操作(Transformation)
官方API文档详细列出转换操作函数,下面简单介绍RDD常用的转换操作:
flatMap()
flatMap(func)
:对于每一个输入元素,通过指定函数映射到0或多个元素,输出新的RDD。# 将上述新闻文本的每一行每一段话根据空格进行分词
rdd = rdd.flatMap(lambda line: line.split(" "))
map()
map(func)
:对于每一个输入元素,通过执行指定函数映射到唯一输出(1v1关系),产生新的RDD# 对上述分词结果的RDD中每一个单词去除标点符号,并转化为小写
rdd = rdd.map(lambda w: w.strip("\"").strip(",").strip(".").lower())
filter()
filter(func)
:是对RDD元素进行过滤,把经过指定函数后返回值为true的元素组成一个新的RDD。# 对上述结果筛选掉一些常见的介词,如
rm_words = ["and","in","at","a","an","is","are","may","that","this","to","as","with","of","can","be"]
rdd = rdd.filter(lambda w: w not in rm_words)
distinct()
distinct([numPartitions])
:对数据进行去重,返回一个新的RDD,numPartitions参数用于设置任务并行数。# 输出新闻中所有提及过的单词
rdd = rdd.distinct(2)
sample()
sample(withReplacement,fraction,seed=None)
:对数据进行采样,withReplacement参数表示是否放回抽样;fraction参数表示抽样比例;seed表示随机种子。# 随机抽取新闻提及部分单词
rdd1 = rdd.sample(False, 0.2, 2019)
union()
union(otherRDD)
:可以与另一个RDD数据集合并,返回一个新的RDD。rdd1 = sc.parallelize(["union","other","rdd"])
rdd = rdd.union(rdd1)
intersection()
intersection(otherRDD)
:可以与另一个RDD数据集进行求交集计算,返回新的RDD。rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([3,4,5,6,7])
rdd3 = rdd1.intersection(rdd2)
# 最后rdd3的结果为 [3,4,5]
subtract()
subtract(otherRDD,[numPartitions])
:是对otherRDD进行减法操作,将原始RDD的元素减去新输入RDD的元素,将差值返回新RDD。rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([2,4])
rdd3 = rdd1.subtract(rdd2)
# 最后rdd3的结果为 [1,3,5]
cartesian()
cartesian(otherRDD)
:可以对两个RDD数据集U,V求笛卡尔积,返回一个新的RDD数据集,其中每个元素为(u,v)。rdd1 = sc.parallelize([1,2])
rdd2 = sc.parallelize([3,4])
rdd3 = rdd1.cartesian(rdd2)
# 最后rdd3的结果为 [(1,3),(1,4),(2,3),(2,4)]
键值对RDD的转换操作(Transformation)
map()
map(func)
:操作可以将一般RDD转换为键值对RDD,元素变成(K,V)。# 对上述新闻分词结果的RDD中每一个单词转化为(w,1)键值对
rdd = rdd.map(lambda w: (w,1))
reduceByKey()
reduceByKey(func,[numPartitions])
:可以对具有相同键的值进行合并,返回一个新的键值对RDD,numPartitions用于设置任务并行数。# 对新闻出现过的单词进行词频统计
rdd = rdd.reduceByKey(lambda w1,w2: w1+w2)
groupByKey()
groupByKey([numPartitions])
:可以对具有相同键的值进行分组,返回一个元素为(K,[Iterable])的键值对RDD,numPartitions用于指定任务并行数,默认为8。# 对新闻出现过的单词进行词频统计
rdd = rdd.groupByKey()
aggregateByKey()
aggregateByKey(zeroValue,seqFunc,combFunc,[numPartitions])
:可以对具有相同键的值进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,其中U由给定的combFunc和中立零值zeroValue聚合而成,U可以有与V不一致的形式;
- zeroValue 可以是0如果聚合的目的是求和,可以是List如果目的是对值进行统合,可以是Set如果目的是聚合唯一值;
- seqFunc: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
- combFunc: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
- numPartitions 用于设置任务并行数;
为什么使用两个函数?见Apache Spark aggregateByKey Example
# 同样对新闻出现过的单词进行词频统计 |
combineByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,[numPartitions])
:可以对具有相同键的值按照自定义函数的逻辑进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,U可以有与V不一致的形式;
- createCombiner: V => C 创建新的聚合器方便后续步骤操作,对原始值进行附加操作并返回,跟flatMap()类似;
- mergeValue: (C,V) => C 对分区内的元素进行聚合(操作发生在每个分区内部);
- mergeCombiners: (C,C) => C 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
- numPartitions 用于设置任务并行数;
groupByKey(), groupByKey(), aggregateByKey() 等都不同程度上依赖于 combineByKey() 操作
# 对词频统计的结果求频数均值,例如 |
sortByKey()
sortByKey(ascending,[numPartitions])
:可以对键值对RDD按照键进行排序操作,其中K需要实现Ordered方法。
- ascending 决定RDD中的元素按升序还是降序排序,默认是True升序;
- numPartitions 用于设置任务并行数;
# 对词频统计结果按降序排序(先把K-V值互换) |
keys()与values()
keys(),values()
:分别把键值对RDD的key和value返回形成一个新的RDD。# 获得上一步排序结果的key或value
rdd = rdd.keys()
rdd = rdd.values()
mapValues()
mapValues(func)
:可以对键值对RDD每个元素的value加载到预定义函数进行操作,而不改变key。# 例如对词频统计结果值都减去1
rdd = rdd.mapValues(lambda v: v - 1)
join()
join(otherRDD,[numPartitions])
:与关系数据库查询一样,表示内连接,给定两个键值对RDD如(K,V1)和(K,V2),对于两个数据集都存在的key才对其输出,得到一个新的RDD,元素为(K,(V1,V2))。除此以外,还包括其他情形:
- fullOuterJoin(otherRDD,[numPartitions]) 全连接
- leftOuterJoin(otherRDD,[numPartitions]) 左外连接
- rightOuterJoin(otherRDD,[numPartitions]) 右外连接
rdd1 = sc.parallelize([('spark',1),('hadoop',1)]) |
RDD的行动操作(Action)
官方API文档详细列出行动操作函数,下面简单介绍常用的行动操作:
count()
count()
:返回RDD数据集中元素的个数。
collect()
collect()
:以数组的形式返回RDD数据集的所有元素。
first()
first()
:返回RDD数据集的第一个元素。
top()
top(num,key=None)
:以数组的形式返回RDD数据集的前num个元素,默认按降序,或者通过key函数指定。rdd = sc.parallelize([5,24,3,12,46])
print(rdd.takeOrdered(3))
# 输出 [46,24,12]
rdd = sc.parallelize([5,24,3,12,46])
print(rdd.takeOrdered(3, key=str))
# 输出 [5,46,3]
take()
take(num)
:以数组的形式返回RDD数据集的前num个元素。
takeOrdered()
takeOrdered(num,key=None)
:以数组的形式返回RDD数据集的前num个元素,默认按升序排序,或者通过key函数指定。rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeOrdered(3))
# 输出 [1,2,3]
rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeOrdered(3, key=lambda x: -x))
# 输出 [5,4,3]
takeSample()
takeSample(withReplacement,num,seed=None)
:对RDD数据集进行采样,并以数组的形式返回,withReplacement参数表示是否放回抽样;num参数表示抽样个数;seed表示随机种子。rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeSample(False,3,seed=2019))
lookup()
lookup(key)
:以数组的形式返回键值对RDD中键为key的所有值,如果RDD数据集经过特定转换操作按照key进行了分区,那么此行动操作效率会很高。
foreach()
foreach(func)
:将RDD数据集中的每个元素加载到指定函数进行操作,无返回值。# 对转换结果逐个输出
rdd.foreach(print)
reduce()
reduce(func)
:通过指定函数(如求和、统计)对RDD数据集元素进行聚合。# 对所有元素进行求和
sum = rdd.reduce(lambda v1,v2: v1 + v2)
aggregate()
aggregate(zeroValue,seqOp,combOp)
:对RDD数据集的元素进行聚合,不要求返回值类型与RDD类型一致;
- zeroValue: U 给定初始值,形式与最终返回值U一致;
- seqOp: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
- combOp: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
# 求一个数组元素的均值 |
countByKey()
countByKey()
:以字典的形式返回键值对RDD数据集中每个键的元素的统计数,即(K,count)rdd = sc.parallelize([("rdd",1),("rdd",2),("spark",2)])
res = rdd.countByKey()
# 输出结果 {'rdd': 2, 'spark': 1}
countByValue()
countByValue()
:以字典的形式返回RDD数据集中每个元素的统计数,即(V,count)rdd = sc.parallelize([2,2,3,1,1])
res = rdd.countByValue()
# 输出结果 {2: 2, 3: 1, 1: 2}
saveAsTextFile()
saveAsTextFile(path, compressionCodecClass=None)
:把RDD数据集保存为文本文件,并可以指定是否压缩。f = NamedTemporaryFile(delete=True)
f.close()
codec = "org.apache.hadoop.io.compress.GzipCodec"
sc.parallelize(['spark', 'rdd']).saveAsTextFile(f.name, codec)
RDD的持久化
- 由于RDD采用惰性机制,每次遇到行动操作都会根据DAG的依赖关系从头开始执行计算,如果遇到迭代计算,需要重复调用中间数据,会造成极大的计算开销;
- 可以通过持久化操作来解决以上的问题,用
persist()
方法对需要重复使用的RDD标记为持久化,当遇到第一次行动操作后,会把计算结果持久化,保存在计算节点的内存备用;
persist()
persist(storageLevel)
:storageLevel参数表示持久化级别,通过使用不同的级别可以把数据缓存到不同的位置,详见 RDD Persistence;其中使用cache()
函数会调用默认的持久化方法,即persist(MEMORY_ONLY)
将RDD作为反序列化的对象存储在JVM中;而unpersist()
方法则可以把持久化的RDD从缓存中删除。rdd = sc.parallelize(['spark','rdd','hadoop'])
rdd.cache()
print(rdd.take(1))
# 第一次行动操作,触发完整计算,同时把rdd放入缓存
# 输出 ['spark']
print(rdd.collect())
# 第二次行动操作,此时直接重复利用上述的缓存rdd
# 输出 ['spark','rdd','hadoop']
rdd.unpersist()