Spark Core
最佳答案 问答题库548位专家为你答疑解惑
Spark Core
本文来自 B站 黑马程序员 - Spark教程 :原地址
第一章 RDD详解
1.1 为什么需要RDD
分布式计算需要
- 分区控制
- shuffle控制
- 数据存储、序列化、发送
- 数据计算API
- 等一系列功能
这些功能,不能简单的通过Python内置的本地集合对象(如List,字典等)去完成。
我们在分布式框架中,需要有一个统一的数据抽象对象,来实现上述分布式计算所需功能
这个抽象对象,就是RDD
1.2 什么是RDD
Spark起源
在spark开山之作**R
**esilient **D
**istributed **D
**atasets:A Fault-Tolerant Abstraction for In-Memory Cluster Computing这篇paper中 (以下简称 RDD Paper),Matei等人提出了RDD这种数据结构,文中开头对RDD的定义是:
RDD定义
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变,可分区,里面的元素可并行计算的集合。
-
Dataset:一个数据集合,用于存放数据的。
-
Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
-
Resilient:RDD中的数据可以存储在内存中或者磁盘中。
RDD定义
- RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变,可分区,里面的元素可并行计算的集合。
- 所有的运算以及操作都建立在RDD数据结构的基础之上。
- 可以认为RDD是分布式的列表list或者数组Array,抽象的数据结构,RDD是一个抽象类Abstract Class和泛型Generic Type
1.3 RDD的五大特性
RDD数据结构内部的五个特性
前三个特征每个RDD都具备的,后两个特征可选的
特性1: RDD是有分区的
RDD的分区是RDD数据存储的最小单位
一份RDD的数据,本质上是分隔成了多个分区
特性2: RDD的方法会作用在其所有的分区上
特性3: RDD之间是有依赖关系(RDD有血缘关系)
sc = SparkContext(conf = conf)rdd1 = sc.textFile("t.txt")
rdd2 = rdd1.flatMap(lambda x:x.split(' '))
rdd3 = rdd2.map(lambda x: (x ,1))
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())
RDD之间是有依赖的,如rdd2会产生rdd3,但是rdd2依赖rdd1,同样rdd3会产生rdd4,但rdd3依赖rdd2
…
会形成一个依赖链条。
这个链条称之为RDD的血缘关系。
特性4: Key-Value型的RDD可以有分区器
默认分区器:Hash分区规则,可以手动设置一个分区器(rdd.partitionBy的方法来设置)
这个特性是可能的,因为不是所有RDD都是key-value型
key-value RDD:RDD中存储的是二元元组,这就是key-value型RDD
二元元组:只有2个元素的元组,比如:(“hadoop”,6)
特性5: RDD的分区规划,会尽量靠近数据所在的服务器
在初始RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上
因为这样可以走本地读取,避免网络读取
本地读取:Executor所在的服务器,同样是一个DataNode,同时这个DataNode上有它要读的数据,所以可以直接读取机器硬盘即可 无需走网络传输
网络读取:读取数据,需要经过网络传输才能读取到
本地读取 性能>网络读取的
总结:
spark会在确保并行计算能力的前提下,尽量确保本地读取
这里是尽量确保,而不是100%确保,所以这个特性也是:可能的
总结
如何正确理解RDD
弹性分布式数据集,分布式计算的实现载体(数据抽象)
RDD五大特点
Internally, each RDD is characterized by five main properties
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute eash split on (e.g block locations for an HDFS file)
在内部,每个RDD都有五个主要特性
- 分区列表
- 用于计算每次拆分的函数
- 与其他RDD的依赖关系列表
- 可选地,键值RDD的Partitioner(例如,说RDD是散列分区的)
- 可选地,计算eash分割的首选位置列表(例如HDFS文件的块位置)
第二章 RDD编程入门
2.1 程序执行入口 SparkContext对象
Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)
只有构建出SparkContextt,基于它才能执行后续的API调用和计算
本质上,SparkContext对编程来说,主要功能就是创建第一个RDD出来
from pyspark import SparkConf, SparkContextif __name__ == '__namin__':# 构建SparkConf对象conf = SparkConf().setAppName("helloword").setMaster("local[*]")# 构建SparkContext执行入口对象sc = SparkContext(conf = conf)
2.2 RDD的创建
RDD的创建主要有2种方式:
- 通过并行化集合创建(本地对象 转 分布式RDD)
- 读取外部数据源(读取文件)
并行化创建
概念:并行化创建是指:将本地集合—>转向分布式RDD
这一步就是,分布式的开端:本地转分布式
API:
rdd = sparkcontext.parallelize(参数1,参数2)
# 参数1 集合对象即可,比如list
# 参数2 分区数
获取RDD分区数
getNumPartitions API
获取RDD分区数量 返回值是Int数字
用法:
rdd.getNumPartitions()
读取文件创建
testFile
这个API可以读取 本地数据,也可以读取hdfs数据
使用方法:
sparkcontext.textFile(参数1,参数2)
# 参数1,必填,文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议
# 参数2,可填,表示最小分区数量
# 注意:参数2 话语权不足,spark有自己的判断,在它允许范围内,参数2有效果,超过spark允许的范围,参数2失败
wholeTextFile
读取文件的API,有个适用场景:适合读取一堆小文件
这个API是小文件读取专用
用法
sparkcontext.wholeTextFiles(参数1,参数2)
# 参数1,必填,文件路径支持本地文件,支持HDFS 也支持一些比如S3协议
# 参数2,可选,表示最小分区数量
# 注意:参数2 话语权不足,这个api 分区数量最多也只能开到文件数量
这个API偏向于少量分区读取数据
因为,这个API表明了自己是小文件读取专用,那么文件的数据很小
分区很多,导致shuffle的几率更高,所以尽量少分区读取数据
2.3 RDD算子
算子是什么
算子:分布式集合对象上的API称之为算子
方法\函数:本地对象的API,叫做方法\函数
算子:分布式对象的API,叫做算子
算子分类
RDD的算子 分成2类
-
Transformation:转换算子
定义:RDD的算子,返回值仍旧是一个RDD的,称之为转换算子
特性:这类算子是
lazy 懒加载
的,如果没有action算子,Transformation算子是不工作的 -
Action:动作(行动)算子
定义:返回值不是RDD
的就是action算子
对于这两类算子来说,Transformation 算子,相当于在构建执行计划,action是一个指令让这个执行计划开始工作
如果没有action,Transformation算子之间的迭代关系,就是一个没有通电的流水线,只有action到来,这个数据处理的流水线才开始工作
2.4 常用的TransforMation算子
map算子
功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接收的处理函数),返回新的RDD
语法:
rdd.map(func)
# func : f:(T) -> U
# f:表示这是一个函数(方法)
# (T)-> U表示的是方法的定义:
# ()表示传入参数,(T)表示传入1个参数,()表示没有传入参数
# T 是泛型的代称,在这里表示 任意类型
# U 也是泛型代称,在这里表示 任意类型# ->U 表示返回值# (T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限。# (A) -> A 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致
flatMap算子
功能:对rdd执行map操作,然后进行解除嵌套
操作
解除嵌套
# 嵌套的list
list = [[1,2,3],[4,5,6],[7,8,9]]# 如果解除了嵌套
list = [1, 2, 3, 4, 5, 6, 7, 8, 9]print(rdd.flatMap(lambda x: x.split(" ")).collect())
reduceByKey算子
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)
的聚合操作
用法:
rdd.reduceByKey(func)# func: (V,V) -> V
# 接受2个传入参数(类型一致),返回一个返回值,类型和传入要求一致
比如,有【1,2,3,4,5】,然后聚合函数是:lambda a, b: a + b
注意:reduceByKey中接收的函数,只负责聚合,不理会分组
分组是自动
by key
来分级的
groupBy算子
功能:将rdd的数据进行分组
语法:
rdd.groupBy(func)# func 函数
# func: (T) ->K
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个函数是 拿到你的返回值后,将所有相同返回值的放入一个组中
# 分组完成后, 每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
Filter算子
功能:过滤想要的数据进行保留
语法:
rdd.filter(func)
# func:(T) ->bool 传入1个参数进来随意类型,返回值必须是True or False
返回是True的数据被保留,False的数据被丢弃
distinct算子
功能:对RDD数据进行去重,返回新RDD
语法:
rdd.distinct(参数1)
# 参数1,去重分区数量,一般不用传
union算子
功能:2个rdd合并成1个rdd返回
用法:
rdd.union(other_rdd)
注意,只合并,不会去重
注意,不同类型的rdd依旧可以混合
join算子
功能:对两个RDD执行join操作(可实现SQL的内\外连接)
注意:join算子只能用于二元元组
语法:
rdd.json(other_rdd) 内连接
rdd.leftOuterJoin(other_rdd)左外
rdd.rightOuterJoin(other_rdd)右外
intersection算子
功能:求2个rdd的交集,返回一个新rdd
用法:
rdd.intersection(other_rdd)
glom算子
功能:将rdd的数据,加上嵌套,这具嵌套按照分区
来进行
比如rdd数据【1,2,3,4,5】有2个分区
那么,被glom后,数据变成:【【1,2,3】,【4,5】】
使用方法:
rdd.glom()
groupByKey算子
功能:针对KV型RDD,自动按照key分组
用法:rdd.groupBYKey(自动按照key分组)
sortBy算子
功能:对rdd数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func,ascending=False,numPartitions=1)# func:(T) -> U :告知按照rdd中的哪个数据进行排序,比如 lambda x: x[1] 表示按照rdd中的第二列元素进行排序
# ascending True升序 False降序
# numPartitions: 用多少分区排序
sortByKey算子
功能:针对KV型RDD,按照key进行排序
语法:
sortByKey(ascending=True,numPartitions=None,KeyFunc=<function RDD >)
- ascending:升序or降序,True升序,False降序,默认是升序
- numPartitions:按照几个分区进行排序,如果全局有序,设置1
- keyfunc:在排序前对key进行处理,语法是(k) -> U,一个参数传入,返回一个值
2.5 常用Action算子
countByKey算子
功能:统计key出现的次数(一般适用kv型RDD)
collect算子
功能:将RDD各个分区内的数据,统一收集Driver中,形成一个list对象
用法:
rdd.collect()
这个算子,是将rdd各个分区数据,都拉取到Driver
注意的是,rdd是分布式对象,其数据量可以很大,所以用这个算子之前
要心知肚明的了解,结果数据集不会太大
不然,会把Driver内存撑爆
reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
# func:(T,T) -> T
# 2参数传入 1个返回值,返回值和参数要求类型一致
fold算子
功能:和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的
这个初始值聚合,会作用在:
- 分区内聚合
- 分区间聚合
比如:【【1,2,3】,【4,5,6】,【7,8,9】】
数据分布在3个分区
分区1 123聚合的时候带上10作为初始值得到16
分区2 456聚合的时候带上10作为初始值得到25
分区3 789聚合的时候带上10作为初始值得到34
3个分区的结果做聚合也带上初始值10,所以结果是:10+16+25+34=85
first算子
功能:取出rdd的每一个元素
用法:
>>> sc.parallelize([3,2,1]).first()
3
take算子
功能:取RDD的前N个元素,组合成list返回给你
用法:
>>> sc.parallelize([3,2,1,4,5,6]).take(5)
[3,2,1,4,5]
top算子
功能:对RDD数据集进行降序排序,取前N个
用法:
>>> sc.parallelize([3,2,1,4,5,6]).top(3) # top 3表示降序取前3个
【6,5,4】
conunt算子
功能:计算rdd有多少条数据,返回值是一个数字
用法:
>>> sc.parallelize([3,2,1,4,5,6]).count()
6
takeSample算子
功能:随机抽样rdd的数据
用法:
takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
- 参数1 true表示运行同一个数据,false表示不允许取同一个数据,和数据无关,是否重复表示的是同一个位置的数据
- 参数2 抽样要几个
- 参数3 随机数种子,这个参数传入一个数字即可,随意给
随机数种子 数字可以随便传,如果传同一个数字 那么取出的结果是一致的
一般参数3 我们不传,Spark会自动给与随机的种子
takeOrdered算子
功能:对RDD进行排序取前N个
用法:
rdd.takeOrddered(参数1,参数2)
- 参数1 要几个数据
- 参数2 对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
这个方法使用安装元素自然顺序升序排序,如果你想玩倒叙,需要用参数2 来对排序的数据进行处理
foreach算子
功能:对rdd的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值
用法:
rdd.foreach(func)
# func: (T) ->None
saveAsTextFile算子
功能:将rdd的数据写入文本文件中
支持 本地写出,hdfs等文件系统
注意:保存文件API,是分布式执行的
这个api的执行数据是 不经过
driver
的
如图,写出的时候,每个分区所在的Executor直接控制数据写出到目标文件系统中
所以才会一个分区产生1个结果文件
注意点
我们学习的action中
- foreach
- saveAsTextFile
这两个算子是分区(Executor)直接执行的
跳过Driver,由分区所在的Executor直接执行
反之
其余的Action算子都会将结果发送至Driver
2.6 分区操作算子
mapPartitions算子
图解代码
如果,mapPartition 一次被传递的是一整个分区的数据
作为一个迭代器(一次性list)对象传入过来
foreachPartition算子
功能:和普通foreach一致,一次处理的是一整个分区数据
foreachPartition就是一个没有返回值的mapPartitions
PartitionBy算子
功能:对RDD进行自定义分区操作
用法:
rdd.partitionBy(参数1,参数2)
- 参数1 重新分区后有几个分区
- 参数2 自定义分区规则,函数传入参数2:(k) ->int
一个传入参数进来,类型无所谓,但是返回值一定是int类型将key传给这个函数,你自己写逻辑,决定返回一个分区编号分区编号从0开始,不要超过分区数-1
分区号不要超标,你设置3个分区,分区号只能是 0 1 2
设置5个分区 分区号只能是 0 1 2 3 4
repartition算子
功能:对RDD的分区执行重新分区(仅数量)
用法:
rdd.repartition(N)
传入N 决定新的分区数
注意:对分区的数量进行操作,一定要慎重
一般情况下,我们写spark代码 除了要求全局排序设置为1个分区外 多数时候,所有API中关于分区相关的代码我们都不太理会
因为,如果你改分区了
- 会影响并行计算(内存迭代的并行管道数量)
- 分区如果增加,极大可能会导致shuffle
coalesce算子
功能:对分区进行数量增减
用法:
rdd.coalesce(参数1,参数2)
- 参数1,分区数
- 参数2,True or FlaseTrue 表示允许shuffle,也就是可以加分区False表示不允许shuffle,也就是不能加分区,False是默认
对比repartition,一般使用coalesce较多,因为加分区要写参数2
这样避免写repartition的时候手抖了加分区了
mapValues算子
功能:针对二元元组RDD
,对其内部的二元元组的value执行map操作
语法:
rdd.mapValues(func)
# func: (V) ->U
# 注意,传入的参数,是二元元组的,value值
# 我们这个传入的方法,只有对value进行处理
join算子
功能:对两个rdd执行join操作(可实现sql的内\外连接)
注意:join算子只能用于二元元组
语法:
rdd.join(other_rdd) # 内连接
rdd.leftOuterJoin(other_rdd) # 左外
rdd.rightOuterJoin(other_rdd) # 右外
2.7 groupByKey和reduceByKey的区别
在功能上的区别
- groupByKey仅仅有分组功能而已
- reduceByKey除了有ByKey的分组功能外,还有reduce聚合功能所以是一分组+聚合一体化的算子
如果对数据执行 分组+聚合,那么使用这2个算子的性能差别是很大的
reduceByKey的性能是远大于:groupByKey + 聚合逻辑的
因为:
如图,这是groupByKey + 聚合逻辑的执行流程
因为,groupByKey只能分组,所以,执行上是先分组(shuffle)后聚合
如图,reduceByKey由于自带聚合逻辑,所以可以完成
- 先在分区内做预聚合
- 然后再走分组流程(shuffle)
- 分组后再做最终聚合
对于groupByKey,reduceByKey最大的提升在于,分组前进行了预聚合,那么在shuffle分组节点,被shuffle的数据可以极大的减少
这就极大的提升了性能
分组+聚合,首选reduceByKey,数据越大,对groupByKey的优势就越高
总结
-
RDD创建有哪几种方法?
通过并行化集合的方式(本地集合转分布式集合)
或者读取数据的方式创建(TextFile\WholdTextFile)
-
RDD分区数如何查看?
通过getNumPartitions API查看,返回值int
-
Transformation 和 Action 的区别?
转换算子的返回值100%是RDD,而Action算子的返回值100%不是RDD
转换算子是懒加载的,只有遇到Action才会执行,Action就是转换算子处理链条的开关
-
哪两个Action算子不经过Driver,直接输出?
foreach 和 saveAsTextFile 直接由Executor执行后输出
不会将结果发送到Driver上去
-
reduceByKey和groupByKey的区别?
reduceByKey自带聚合逻辑,groupByKey不带
如果做数据聚合reduceByKey的效率更好,因为可以先聚合后shuffle再最终聚合,传输的IO小
-
mapPartitions 和 foreachPartition 的区别?
mapPartitions带有返回值foreachPartition不带
-
对于分区操作有什么要注意的地方?
尽量不要增加分区,可能破坏内存迭代的计算管道
第三章 RDD持久化
3.1 RDD的数据是过程数据
RDD的数据是过程数据
RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失
RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了
这个特性可以最大化的利用资源,老旧的RDD没用了 就从内存中清理,给后续的计算腾出内存空间
如上图,rdd3被2次使用,第一次使用之后,其实rdd3就不存在了
第二次用的时候,只能基于rdd的血缘关系,从RDD1重新执行,构建出来rdd3,供rdd5使用
3.2 rdd的缓存
对于,上述的场景,肯定要执行优化,优化就是:
RDD3如果不消失,那么RDD1->RDD2->RDD3这个链条就不会执行2次,或者更多次
RDD的缓存技术:Spark提供了缓存API,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上缓存的API
# RDD3 被2次使用,可以加入缓存进行优化
# 缓存到内存中
rdd3.cache()
# 仅内存缓存
rdd3.persist(StorageLevel.MEMORY_ONLY)
# 仅内存缓存,2个副本
rdd3.persist(StorageLevel.MEMORY_ONLY_2)
# 仅缓存硬盘上
rdd3.persist(StorageLevel.DISK_ONLY)
# 仅缓存硬盘上,2个副本
rdd3.persist(StorageLevel.DISK_ONLY_2)
# 仅缓存硬盘上,3个副本
rdd3.persist(StorageLevel.DISK_ONLY_3)
# 先放内存,不够放硬盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# 先放内存,不够放硬盘, 2个副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)
# 堆外内存(系统内存)
rdd3.persist(StorageLever.OFF_HEAP)# 如上API,自行选择使用即可
# 一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# 如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK.ONLY)或者就别用缓存了 用CheckPoint# 主动清理缓存的API
rdd.unpersist()
- 缓存技术可以将过程RDD数据,持久化保存到内存或者硬盘上
- 但是,这个保存在设定上是认为不安全的
缓存的数据在设计上是认为有丢失风险的
所以,缓存有一个特点就是:其保留在RDD之间的血缘(依赖)关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据
缓存如何丢失
- 在内存中的缓存是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路
- 硬盘中因为硬盘损坏也是可能丢失的
缓存到内存:
Executor:缓存到Executor的内存空间
缓存到硬盘
Executor:缓存到Executor所在服务器的硬盘
3.3 RDD的CheckPoint
CheckPoint技术,也是将RDD的数据,保存起来
但是它仅支持硬盘存储
并且:
- 它被设计认为是安全的
- 不保留血缘关系
CheckPoint是如何保存数据的
这个RDD的数据将被CheckPoint到HDFS
中
CheckPoint存储RDD数据,是集中收集
各个分区数据进行存储,而缓存是分散存储
缓存和CheckPoint的对比
- CheckPoint 不管分区数量多少,风险是一样的,缓存分区越多,风险越高
- CheckPoint 支持写入HDFS,缓存不行,HDFS是高可靠存储,CheckPoint被认为是安全的
- CheckPoint 不支持内存,缓存可以,缓存如果写内存性能比CheckPoint要好一些
- CheckPoint 因为设计认为是安全的,所以
不保留血缘关系
,而缓存因为设计上认为不安全,所以保留
注意
CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适
或者数据量很大,用CheckPoint比较合适
如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可
Cache和CheckPoint两个APi都不是Action类型
所以,想要它俩工作,必须在后面接上Action
接上Action的目的,是让RDD有数据,而不是为了让CheckPoint和Cache工作
总结
-
Cache和CheckPoint区别
cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)
checkPoint是重量级保存rdd数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留rdd血缘关系)
-
Cache和CheckPoint的性能对比
cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快
checkpoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本)
第四章 RDD共享变量
4.1 广播变量
给每个 Executor 来一份数据,而不是像原本那样,每一个分区的处理线程都来一份,节省内存
使用方法:
# 1. 将本地list 标记成广播变量即可
broadcast = sc.broadcast(stu_info_list)# 2. 使用广播变量,从broadcast对象中取出本地list对象即可
value = broadcast.value# 也就是 先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
# 只要中间传输的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区都要给
4.2 累加器
不使用累加器:不管Executor中累加到多少,都和Driver中无关,计算结果统计不会累加
sc.accumulator(初始值)即可构建
也就是,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用
可能会重新构建此rdd
如果累加器累加代码,存在重新构建的步骤中
累加器累加的代码就可能被多次执行
如何解决:加缓存或者CheckPoint即可
总结
-
广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能
-
累加器解决了什么问题?
分布式代码执行中,进行全局累加
第五章 Spark内核调度
5.1 DAG
Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。
以词频统计WordCount程序为例,DAG图:
DAG:有向无环图
有向:有方向
无环:没有闭环
DAG:有方向没有形成闭环的一个执行流程图
此图,就是一个典型的DAG图
有方向:RDD1->RDD2->…->collect结束
无闭环:以action(collect)结束了,没有形成闭环循环
作用:标识代码的
逻辑
执行流程
📝 Job和Action
Action:返回值不是RDD的算子
它的作用是一个触发开关,会将action算子之前的一串rdd依赖链条执行起来,也就是一个Action会产生一个DAG图。
一个Action会产生一个DAG,如果代码中有3个Action就产生3个DAG
一个Action产生的一个DAG,会在程序运行中,产生一个Job
所以: 1个Action = 1个DAG = 1个Job
如果一个代码中,写了3个Action,那么这个代码运行起来产生3个Job,每个Job有自己的DAG
一个代码运行起来,在Spark中称之为:Application
📝层级关系:
1个Application中,可以有多个Job,每一个Job内含一个DAG,同时每一个Job都是由一个Action产生的。
📝DAG和分区:
Dag是Spark代码的逻辑执行图,这个Dag的最终作用是;为了构建物理上的
Spark详细执行计划而生
所以,由于Spark是分布式(多分区)的,那么Dag和分区之间也是有关联的
5.2 DAG的宽窄依赖和阶段划分
在SparkRDD前后之间的关系,分为:
- 窄依赖
- 宽依赖
窄依赖:父RDD的一个分区,全部
将数据发给子RDD的一个
分区
宽依赖:父RDD的一个分区,将数据发给子RDD的多个
分区
宽依赖还有一个别名:shuffle
在stage的内部,一定都是窄依赖
5.3 内存迭代计算
如图,基于带有分区的Dag
以及阶段划分,可以从图中得到,逻辑上最优的task分配,一个task是一个线程来具体执行
那么如上图,task1 中 rdd1 rdd2 rdd3 的迭代计算,都是由一个task(线程完成),这一阶段的这一条线,是纯内存计算。
如上图,task1,task2,task3,就形成了三个并行的内存计算管道
Spark默认受到全局并行度的限制,除了个别算子有特殊分区情况,大部分的算子,都会遵循全局并行度的要求,来规划自己的分区数
如果全局并行度是3,其实大部分算子分区都是3
注意:spark我们一般推荐只设置全局并行度,不要再算子上设置并行度
除了一些排序算子外,计算算子就让他默认开分区就可以了
Spark是怎么做内存计算的?DAG的作用?stage阶段划分的作用?
- spark会产生DAG图
- DAG图会基于分区和宽窄依赖划分阶段
- 一个阶段的内部都是
窄依赖
,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存迭代计算的管道 - 这些内存迭代计算的管道,就是一个个具体的执行task
- 一个task是一个具体的线程,任务跑在一个线程内,就是走内存计算了
Spark为什么比MapReduce快
-
Spark的算子丰富,MapReduce算子匮乏(Map和Reduce),MapReduce这个编程模型,很难在一套Mr中处理复杂的任务。
很多的复杂任务,是需要写多个MapReduce进行串联,多个MR串联通过磁盘交互数据
-
Spark可以执行内存迭代,算子之间形成DAG基于依赖划分阶段后,在阶段内形成内存迭代管道,但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的
总结
- 编程模型上Spark占优(算子够多)
- 算子交互上,和计算上可以尽量多的内存计算而非磁盘迭代
5.4 Spark并行度
在同一时间内,有多个task在同时运行
并行度:并行能力的设置
比如设置并行度6,其实就是要6个task并行在跑
在有了6个task并行的前提下,rdd的分区就被规划成6个分区了
如何设置并行度
可以在代码中和配置文件中以及提交程序的客户端参数中设置
优先级从高到低:
- 代码中
- 客户端提交参数中
- 配置文件中
- 默认(1,但是不会全部以1来跑,多数时候基于读取文件的分片数量来作为默认并行度)
全局并行度配置的参数:
spark.default.parallelism
全局并行度 - 推荐
配置文件中
conf/spark-defaults.conf中设置
spark.default.parallelism 100
在客户端提交参数中
bin/spark-submit --conf "spark.default.parallelism=100"
在代码中设置
conf = SparkConf()
conf.set("spark.default.parallelism","100")
全局并行度是推荐设置,不要针对RDD改分区,可能会影响内存迭代管道的构建,或者会产生额外的shuffle
针对RDD的并行度设置 - 不推荐
只能在代码中写,算子:
- repartition算子
- coalesce算子
- partitionBy算子
集群中如何规划并行度
结论:设置cpu总核心的2~10倍
比如集群可用cpu核心是100个,我们建议并行度是200~1000
确保cpu核心的整数倍即可,最小是2倍,最大一般10倍或者更高(适量)均可
为什么设置最少2倍?
cpu的一个核心同一时间只能干一件事情
所以,在100个核心的情况下,设置100个并行,就能让cpu 100%出力
这种设置下,如果task的压力不均衡,某个task先执行完了,就导致某个cpu核心空闲
所以,我们将task(并行)分配的数量变多,比如800个并行,同一时间只有100个在运行,700个在等待
但是可以确保,某个task运行完了,后续有task补上,不让cpu闲下来,最大程度利用集群的资源
规划并行度,只看
集群总CPU核心
5.5 Spark的任务调度
Spark的任务,由Driver进行调试,这个工作包含:
- 逻辑DAG产生
- 分区DAG产生
- Task划分
- 将Task分配给Executor并监控其工作
如图,Spark程序的调度流程如图:
- Driver被构建出来
- 构建SparkContext(执行环境入口对象)
- 基于DAG Scheduler(DAG调度器)构建逻辑Task分配
- 基于TaskScheduler(Task调度器)将逻辑Task分配到各个Executor上干活,并监视它们
- worker(Executor),被TaskScheduler管理监控,听从它们的指令干活,并定期汇报进度
1,2,3,4 都是Driver的工作
5 是worker的工作
Driver内的两个组件
Dag调度器
工作内容:将逻辑的Dag图进行处理,最终得到逻辑上的Task划分
Task调度器
工作内容:基于Dag scheduler的产出,来规划
这些逻辑
的task,应该在哪些物理
的Executor上运行,以及监控管理它们的运行
5.6 拓展 - Spark运行中的概念名词大全
层级关系梳理
- 一个spark环境可以运行多个Application
- 一个代码运行起来,会成为一个Application
- Application内部可以有多个job
- 每个job由一个Action产生,并且每个job有自己的DAG执行图
- 一个Job的DAG图 会基于宽窄依赖划分成不同的阶段
- 不同阶段内基于分区数量,形成多个并行的内存迭代管道
- 每一个内存迭代管道形成一个Task(DAG调度器划分将job内划分出具体的task任务,一个job被划分出来的task在逻辑上称之为这个job的taskset)
5.7 Spark Shuffle
简介
Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化,跨节点网络IO以及磁盘读写IO等
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的stage,前者是Parent Stage的最后一步,后者是Child Stage的每一步。
执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMap Task要进行Shuffle, ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话,shuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的shuffle是可以串行的,resultTask则只能充当reduce端任务的角色。
Spark在1.1以前的版本一直是采用Hash Shuffle的实现方式,到1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle, 在1.5版本时开始Tungsten钨丝计划,引入Unsafe Shuffle优化内存及Cpu的使用,在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式,到的2.0版本,Hash Shuffle已被删除,所有shuffle方式全部统一到Sort Shuffle一个实现中
在Spark的中,负责shuffle过程的执行,计算和处理的组件主要就是ShuffleManager,也即shuffle管理器,shuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能
因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
Hash Shuffle了解
Shuffle 阶段划分:
shuffle write: mapper阶段,上一个stage得到最后的结果写出
shuffle read: reduce阶段,下一个stage拉取上一个stage进行合并
-
未经优化的hashShuffleManager:
HashShuffle是根据task是计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下:
根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。
未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。提出如下解决方案
-
经过优化的hashShuffleManager:
在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,每一个group磁盘文件的数量与下游stage的task数量是相同的。
未经优化:
上游的task数量:m
下游的task数量:n
上游executor数量:k (m>=k)
总共的磁盘文件:m*n
优化之后的
上游的task数量:m
下游的task数量:n
上游的executor数量:k(m>=k)
总共的磁盘文件:k*n
Sort Shuffle Manager了解
SortShuffleManager的运行机制主要分成两种,一种普通运行机制,另一种bypass运行机制,当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制
-
该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存,如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存
-
接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构
-
排序
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序
-
溢写
排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
-
merge
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程
由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset
Sort Shuffle bypass机制
bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值
- 不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
- 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件
- 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
面该机制与普通sortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
总结
- SortShuffle也分为普通机制和bypass机制
- 普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件
- 而当shuffle map task 数量小于Spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,sortShuffle的bypass机制不会进行排序,极大的提高了其性能。
Shuffle的配置选项
shuffle阶段划分:
shuffle write: mapper阶段,上一个stage得到最后的结果写出
shuffle read: reduce阶段,下一个stage拉取上一个stage进行合并
配置选项说明
spark的shuffle调优:主要是调整缓冲的大小,拉取次数重试次数与等待时间,内存比例分配,是否进行排序操作等等
spark.shuffle.file.buffer
参数说明: 该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32k)。将数据琯到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
调优建议:如果作业可用的内存资源较为充足的话,只可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO资次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
spark.reducer.maxSizeInFlight
参数说明:该参数用于设置shuffle read task的buffer缓冲大小 ,而这个buffer缓冲决定了每次能够拉取多少数据(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小 (比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现 ,合理调节该参数,性能会有1%~5%的提升
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait
spark.shuffle.io.maxRetries: shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔(默认5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔
spark.shuffle.menoryFraction
参数说明:该参数代表Executor内存中,分配给shuffle read task进行聚合操作内存比例
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认sort)spark1.5x以后有三个可选项
Hash:spark1.x版本的默认值,HashShuffleManager
Sort: Spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当shuffleManager为sortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作
调优建议:当你使用sortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
总结
-
DAG是什么有什么用?
DAG有向无环图,用以描述任务执行流程,主要作用是协助DAG调度器构建Task分配用以做任务管理
-
内存迭代\阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
-
DAG调度器是?
构建Task分配用以做任务管理
到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
调优建议:如果作业可用的内存资源较为充足的话,只可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO资次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
spark.reducer.maxSizeInFlight
参数说明:该参数用于设置shuffle read task的buffer缓冲大小 ,而这个buffer缓冲决定了每次能够拉取多少数据(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小 (比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现 ,合理调节该参数,性能会有1%~5%的提升
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait
spark.shuffle.io.maxRetries: shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数(默认是3次)
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔(默认5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔
spark.shuffle.menoryFraction
参数说明:该参数代表Executor内存中,分配给shuffle read task进行聚合操作内存比例
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认sort)spark1.5x以后有三个可选项
Hash:spark1.x版本的默认值,HashShuffleManager
Sort: Spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass机制
spark.shuffle.sort.bypassMergeThreshold
参数说明:当shuffleManager为sortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作
调优建议:当你使用sortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
总结
-
DAG是什么有什么用?
DAG有向无环图,用以描述任务执行流程,主要作用是协助DAG调度器构建Task分配用以做任务管理
-
内存迭代\阶段划分?
基于DAG的宽窄依赖划分阶段,阶段内部都是窄依赖可以构建内存迭代的管道
-
DAG调度器是?
构建Task分配用以做任务管理
99%的人还看了
相似问题
猜你感兴趣
版权申明
本文"Spark Core":http://eshow365.cn/6-34875-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!