Spark笔记-玩转RDD操作

RDD(Resilient Distributed Dataset)译作弹性分布式数据集,是Spark中最常用的数据抽象,是一个只可读、可分区、可并行计算的数据集合。RDD允许将工作集缓存在内存中进行复用,大大地提升了查询速度。

RDD简介

  • MapReduce 在面对日益复杂的业务逻辑时已经表现出严重的不足:1)维护成本高昂,每一次数据处理都需要编写复杂的Map和Reduce步骤,中间某一步骤出错就要重试以处理异常;2)难以上手,造成处理性能低;
  • 因此人们想出用有向无环图(DAG)来抽象表达复杂的数据处理逻辑,各个数据处理步骤表示成图中的节点与边依赖关系,形成数据流的抽象表示,而把复杂的性能优化提交给后台自动处理;
  • RDD也即分布式对象集合,是一个只读的分区记录集合,每个RDD可以划分成多个分区,每个分区就是数据集的一部分,同时不同分区可以存储在集群中不同的节点上,从而利用集群节点优势进行并行计算;
  • RDD提供了丰富的操作以支持常见的数据处理,即“转换”(Transformation)和“行动”(Action)两种类型操作
    • 转换操作指定RDD的依赖关系,通过接受RDD并返回RDD;
    • 行动操作执行计算并指定输出的形式,通过接受RDD返回输出值或结果(非RDD);
  • 通过Spark的API可以使用不同的语言调用RDD的操作,常见过程流程如下:
    • 从各种数据源创建RDD;
    • 对RDD指定一系列的转换操作;
    • 最后调用行动操作,输出结果或写入外部数据源;
  • RDD操作的惰性机制,是指在RDD执行操作时,只有触发行动操作才会做真正的计算,而在行动前的所有转换操作都只是记录下相互的依赖关系,形成数据流的管道化(pipeline),而不会做真正的计算;

Spark RDD 的执行过程

RDD的创建

通过数据集合转化为RDD

sc = SparkContext("local", "create_rdd")
ints = [1,3,4,5,6,4,3,2]
rdd = sc.parallelize(ints)
strings = ['spark','hadoop','rdd']
rdd = sc.parallelize(strings)

从HDFS数据源或本地文件创建

从一篇CNN新闻报道 Uber and Lyft may look the same, but their visions are not 中抽取新闻主体并放到文件news_sep.txt中,一段话为一行。

At first blush, it can be hard to tell Uber and ...
But look under the hood and there's a clear difference ...
Uber filed paperwork on Thursday for what is expected ...
...

通过指定文件路径读取为RDD:

sc = SparkContext("local", "create_rdd")
# 从本地文件系统地址加载
rdd = sc.textFile("file:///path/to/news_sep.txt")
# 从分布式文件系统HDFS地址加载,下面三种方式等价
rdd = sc.textFile("hdfs://localhost:9000/path/to/news_sep.txt")
rdd = sc.textFile("/path/to/news_sep.txt")
rdd = sc.textFile("news_sep.txt")

从其他数据库读取数据创建

(待更新)

使用数据流创建

结合流数据处理技术,如Spark Streaming、Kafka以及flume等,通过接收实时的输入数据流创建RDD。

一般RDD的转换操作(Transformation)

官方API文档详细列出转换操作函数,下面简单介绍RDD常用的转换操作:

flatMap()

flatMap(func):对于每一个输入元素,通过指定函数映射到0或多个元素,输出新的RDD。

# 将上述新闻文本的每一行每一段话根据空格进行分词
rdd = rdd.flatMap(lambda line: line.split(" "))

map()

map(func):对于每一个输入元素,通过执行指定函数映射到唯一输出(1v1关系),产生新的RDD

# 对上述分词结果的RDD中每一个单词去除标点符号,并转化为小写
rdd = rdd.map(lambda w: w.strip("\"").strip(",").strip(".").lower())

filter()

filter(func):是对RDD元素进行过滤,把经过指定函数后返回值为true的元素组成一个新的RDD。

# 对上述结果筛选掉一些常见的介词,如
rm_words = ["and","in","at","a","an","is","are","may","that","this","to","as","with","of","can","be"]
rdd = rdd.filter(lambda w: w not in rm_words)

distinct()

distinct([numPartitions]):对数据进行去重,返回一个新的RDD,numPartitions参数用于设置任务并行数。

# 输出新闻中所有提及过的单词
rdd = rdd.distinct(2)

sample()

sample(withReplacement,fraction,seed=None):对数据进行采样,withReplacement参数表示是否放回抽样;fraction参数表示抽样比例;seed表示随机种子。

# 随机抽取新闻提及部分单词
rdd1 = rdd.sample(False, 0.2, 2019)

union()

union(otherRDD):可以与另一个RDD数据集合并,返回一个新的RDD。

rdd1 = sc.parallelize(["union","other","rdd"])
rdd = rdd.union(rdd1)

intersection()

intersection(otherRDD):可以与另一个RDD数据集进行求交集计算,返回新的RDD。

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([3,4,5,6,7])
rdd3 = rdd1.intersection(rdd2)
# 最后rdd3的结果为 [3,4,5]

subtract()

subtract(otherRDD,[numPartitions]):是对otherRDD进行减法操作,将原始RDD的元素减去新输入RDD的元素,将差值返回新RDD。

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([2,4])
rdd3 = rdd1.subtract(rdd2)
# 最后rdd3的结果为 [1,3,5]

cartesian()

cartesian(otherRDD):可以对两个RDD数据集U,V求笛卡尔积,返回一个新的RDD数据集,其中每个元素为(u,v)。

rdd1 = sc.parallelize([1,2])
rdd2 = sc.parallelize([3,4])
rdd3 = rdd1.cartesian(rdd2)
# 最后rdd3的结果为 [(1,3),(1,4),(2,3),(2,4)]

键值对RDD的转换操作(Transformation)

map()

map(func):操作可以将一般RDD转换为键值对RDD,元素变成(K,V)。

# 对上述新闻分词结果的RDD中每一个单词转化为(w,1)键值对
rdd = rdd.map(lambda w: (w,1))

reduceByKey()

reduceByKey(func,[numPartitions]):可以对具有相同键的值进行合并,返回一个新的键值对RDD,numPartitions用于设置任务并行数。

# 对新闻出现过的单词进行词频统计
rdd = rdd.reduceByKey(lambda w1,w2: w1+w2)

groupByKey()

groupByKey([numPartitions]):可以对具有相同键的值进行分组,返回一个元素为(K,[Iterable])的键值对RDD,numPartitions用于指定任务并行数,默认为8。

# 对新闻出现过的单词进行词频统计
rdd = rdd.groupByKey()

aggregateByKey()

aggregateByKey(zeroValue,seqFunc,combFunc,[numPartitions]):可以对具有相同键的值进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,其中U由给定的combFunc和中立零值zeroValue聚合而成,U可以有与V不一致的形式;

  • zeroValue 可以是0如果聚合的目的是求和,可以是List如果目的是对值进行统合,可以是Set如果目的是聚合唯一值;
  • seqFunc: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
  • combFunc: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
  • numPartitions 用于设置任务并行数;

为什么使用两个函数?见Apache Spark aggregateByKey Example

# 同样对新闻出现过的单词进行词频统计
rdd = rdd.aggregateByKey(0,lambda v1,v2: v1+v2, lambda a1,a2: a1+a2)

combineByKey()

combineByKey(createCombiner,mergeValue,mergeCombiners,[numPartitions]):可以对具有相同键的值按照自定义函数的逻辑进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,U可以有与V不一致的形式;

  • createCombiner: V => C 创建新的聚合器方便后续步骤操作,对原始值进行附加操作并返回,跟flatMap()类似;
  • mergeValue: (C,V) => C 对分区内的元素进行聚合(操作发生在每个分区内部);
  • mergeCombiners: (C,C) => C 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
  • numPartitions 用于设置任务并行数;

groupByKey(), groupByKey(), aggregateByKey() 等都不同程度上依赖于 combineByKey() 操作

# 对词频统计的结果求频数均值,例如
strings = ['spark','hadoop','rdd','rdd','spark','rdd']
rdd = sc.parallelize(strings)
rdd = rdd.map(lambda w: (w,1))
rdd = rdd.reduceByKey(lambda v1,v2: v1 + v2)
rdd = rdd.combineByKey(lambda v: (1,v), lambda c, v: (c[0]+1,c[1]+v), lambda c1, c2: (c1[0]+c1[0],c2[1]+c2[1]))
# 最后的结果为 [('spark', (1, 2)), ('hadoop', (1, 1)), ('rdd', (1, 3))]

sortByKey()

sortByKey(ascending,[numPartitions]):可以对键值对RDD按照键进行排序操作,其中K需要实现Ordered方法。

  • ascending 决定RDD中的元素按升序还是降序排序,默认是True升序;
  • numPartitions 用于设置任务并行数;
# 对词频统计结果按降序排序(先把K-V值互换)
rdd = rdd.map(lambda v: (v[1],v[0]))
rdd = rdd.sortByKey(False)
rdd = rdd.map(lambda v: (v[1],v[0]))

keys()与values()

keys(),values():分别把键值对RDD的key和value返回形成一个新的RDD。

# 获得上一步排序结果的key或value
rdd = rdd.keys()
rdd = rdd.values()

mapValues()

mapValues(func):可以对键值对RDD每个元素的value加载到预定义函数进行操作,而不改变key。

# 例如对词频统计结果值都减去1
rdd = rdd.mapValues(lambda v: v - 1)

join()

join(otherRDD,[numPartitions]):与关系数据库查询一样,表示内连接,给定两个键值对RDD如(K,V1)和(K,V2),对于两个数据集都存在的key才对其输出,得到一个新的RDD,元素为(K,(V1,V2))。除此以外,还包括其他情形:

  • fullOuterJoin(otherRDD,[numPartitions]) 全连接
  • leftOuterJoin(otherRDD,[numPartitions]) 左外连接
  • rightOuterJoin(otherRDD,[numPartitions]) 右外连接
rdd1 = sc.parallelize([('spark',1),('hadoop',1)])
rdd2 = sc.parallelize([('spark',1)])
rdd1 = rdd1.join(rdd2)
# 最后结果为 [('spark', (1,1))]

RDD的行动操作(Action)

官方API文档详细列出行动操作函数,下面简单介绍常用的行动操作:

count()

count():返回RDD数据集中元素的个数。

collect()

collect():以数组的形式返回RDD数据集的所有元素。

first()

first():返回RDD数据集的第一个元素。

top()

top(num,key=None):以数组的形式返回RDD数据集的前num个元素,默认按降序,或者通过key函数指定。

rdd = sc.parallelize([5,24,3,12,46])
print(rdd.takeOrdered(3))
# 输出 [46,24,12]
rdd = sc.parallelize([5,24,3,12,46])
print(rdd.takeOrdered(3, key=str))
# 输出 [5,46,3]

take()

take(num):以数组的形式返回RDD数据集的前num个元素。

takeOrdered()

takeOrdered(num,key=None):以数组的形式返回RDD数据集的前num个元素,默认按升序排序,或者通过key函数指定。

rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeOrdered(3))
# 输出 [1,2,3]
rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeOrdered(3, key=lambda x: -x))
# 输出 [5,4,3]

takeSample()

takeSample(withReplacement,num,seed=None):对RDD数据集进行采样,并以数组的形式返回,withReplacement参数表示是否放回抽样;num参数表示抽样个数;seed表示随机种子。

rdd = sc.parallelize([5,2,3,1,4])
print(rdd.takeSample(False,3,seed=2019))

lookup()

lookup(key):以数组的形式返回键值对RDD中键为key的所有值,如果RDD数据集经过特定转换操作按照key进行了分区,那么此行动操作效率会很高。

foreach()

foreach(func):将RDD数据集中的每个元素加载到指定函数进行操作,无返回值。

# 对转换结果逐个输出
rdd.foreach(print)

reduce()

reduce(func):通过指定函数(如求和、统计)对RDD数据集元素进行聚合。

# 对所有元素进行求和
sum = rdd.reduce(lambda v1,v2: v1 + v2)

aggregate()

aggregate(zeroValue,seqOp,combOp):对RDD数据集的元素进行聚合,不要求返回值类型与RDD类型一致;

  • zeroValue: U 给定初始值,形式与最终返回值U一致;
  • seqOp: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
  • combOp: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
# 求一个数组元素的均值
rdd = sc.parallelize([5,2,3,1,4])
res = rdd.aggregate((0,0), lambda u,v: (u[0]+v,u[1]+1), lambda u1,u2: (u1[0]+u2[0],u1[1]+u2[1]))
print(res[0]/res[1])

countByKey()

countByKey():以字典的形式返回键值对RDD数据集中每个键的元素的统计数,即(K,count)

rdd = sc.parallelize([("rdd",1),("rdd",2),("spark",2)])
res = rdd.countByKey()
# 输出结果 {'rdd': 2, 'spark': 1}

countByValue()

countByValue():以字典的形式返回RDD数据集中每个元素的统计数,即(V,count)

rdd = sc.parallelize([2,2,3,1,1])
res = rdd.countByValue()
# 输出结果 {2: 2, 3: 1, 1: 2}

saveAsTextFile()

saveAsTextFile(path, compressionCodecClass=None):把RDD数据集保存为文本文件,并可以指定是否压缩。

f = NamedTemporaryFile(delete=True)
f.close()
codec = "org.apache.hadoop.io.compress.GzipCodec"
sc.parallelize(['spark', 'rdd']).saveAsTextFile(f.name, codec)

RDD的持久化

  • 由于RDD采用惰性机制,每次遇到行动操作都会根据DAG的依赖关系从头开始执行计算,如果遇到迭代计算,需要重复调用中间数据,会造成极大的计算开销;
  • 可以通过持久化操作来解决以上的问题,用persist()方法对需要重复使用的RDD标记为持久化,当遇到第一次行动操作后,会把计算结果持久化,保存在计算节点的内存备用;

persist()

persist(storageLevel):storageLevel参数表示持久化级别,通过使用不同的级别可以把数据缓存到不同的位置,详见 RDD Persistence;其中使用cache()函数会调用默认的持久化方法,即persist(MEMORY_ONLY)将RDD作为反序列化的对象存储在JVM中;而unpersist()方法则可以把持久化的RDD从缓存中删除。

rdd = sc.parallelize(['spark','rdd','hadoop'])
rdd.cache()
print(rdd.take(1))
# 第一次行动操作,触发完整计算,同时把rdd放入缓存
# 输出 ['spark']
print(rdd.collect())
# 第二次行动操作,此时直接重复利用上述的缓存rdd
# 输出 ['spark','rdd','hadoop']
rdd.unpersist()

文章作者: yxnchen
文章链接: http://yxnchen.github.io/technique/Spark笔记-玩转RDD操作/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 YXN's Blog