为什么spark job stage task的taskSet.resourceOffer每次只返回一个task

现在的位置:
Spark的任务调度
本文尝试从源码层面梳理Spark在任务调度与资源分配上的做法。
先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。
makeOffer的目的是在有资源更新的情况下,通过调用scheduler的resourceOffers方法来触发它对现有的任务进行一次分配,最终launch新的tasks。这里的全局scheduler就是TaskScheduler,实现是TaskSchedulerImpl,它可以对接各种SchedulerBackend的实现,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的时候,会把现有的executor资源以WorkerOfffer列表的方式传给scheduler,即以worker为单位,将worker信息及其内的资源交给scheduler。scheduler拿到这一些集群的资源后,去遍历已提交的tasks并根据locality决定如何launch tasks。
TaskScheduler里,resourceOffers方法会将已经提交的tasks进行一次优先级排序,这个排序算法目前是两种:FIFO或FAIR。得到这一份待运行的tasks后,接下里就是要把schedulerBackend交过来的worker资源信息合理分配给这些tasks。分配前,为了避免每次都是前几个worker被分到tasks,所以先对WorkerOffer列表进行一次随机洗牌。接下来就是遍历tasks,看workers的资源"够不够","符不符合"task,ok的话task就被正式launch起来。注意,这里资源"够不够"是很好判断的,在TaskScheduler里设置了每个task启动需要的cpu个数,默认是1,所以只需要做核数的大小判断和减1操作就可以遍历分配下去。而"符不符合"这件事情,取决于每个tasks的locality设置。
task的locality有五种,按优先级高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同个进程里,次好是同个node(即机器)上,再次是同机架,或任意都行。task有自己的locality,如果本次资源里没有想要的locality资源,怎么办呢?spark有一个spark.locality.wait参数,默认是3000ms。对于process,node,rack,默认都使用这个时间作为locality资源的等待时间。所以一旦task需要locality,就可能会触发delay scheduling。
到这里,对于任务的分配,资源的使用大致有个了解。实际上,TaskScheduler的resourceOffer里还触发了TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是会检查task的locality并最终调用DAGScheduler去launch这个task。这些类的名字以及他们彼此的调用关系,看起来是比较乱的。我简单梳理下。
这件事情要从Spark的DAG切割说起。Spark RDD通过其transaction和action操作,串起来形成了一个DAG。action的调用,触发了DAG的提交和整个job的执行。触发之后,由DAGScheduler这个全局唯一的面向stage的DAG调度器来切分DAG,根据是否shuffle来切成多个小DAG,即stage。凡是RDD之间是窄依赖的,都归到一个stage里,这里面的每个操作都对应成MapTask,并行度就是各自RDD的partition数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个stage,这里面的操作对应成ResultTask,结果RDD的partition数就是并行度。MapTask和ResultTask分别可以简单理解为传统MR的Map和Reduce,切分他们的依据本质上就是shuffle。所以shuffle之前,大量的map是可以同partition内操作的。每个stage对应的是多个MapTask或多个ResultTask,这一个stage内的task集合成一个TaskSet类,由TaskSetManager来管理这些task的运行状态,locality处理(比如需要delay scheduling)。这个TaskSetManager是Spark层面上的,如何管理自己的tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。
SchedulerBackend的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有yarn,mesos,standalone。拿standalone模式来说粗粒度,每台物理机器是一个worker,worker一共可以使用多少cpu和内存,启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。在我看来,粗粒度与细粒度的主要区别,就是粗粒度是进程long-running的,计算线程可以调到executor上跑,但executor的cpu和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是AMPLab论文里最先提出来并在Mesos里实现的。AMPLab在资源使用粒度甚至任务分配最优的这块领域有不少论文,包括Mesos的DRF算法、Sparrow调度器等。所以standalone模式下,根据RDD的partition数,以及每个task需要的cpu数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道TaskSet要分几批才能跑完一个stage。
EasyQuery的目标是不需要写一行java代码就可以实现非常非常复杂的查询,省时省力,提高效率。
【上篇】【下篇】
您可能还会对这些文章感兴趣!
籍贯山东,落户北京,IT行业。
工作经历:
2014年至今&,自主创业
,传智播客
,超人学院
,亚信科技
教育经历:
,中科院研究生院
,河北大学当前访客身份:游客 [
拥有积分:3
这家伙太懒,还没有签名!
解答题中心
[Spark源码剖析] Task的调度与执行源码剖析
( 15:16:56) &|
&评论(0)&&|
&阅读次数(821)|
人收藏此文章,
本文基于Spark 1.3.1,Standalone模式
一个Spark Application分为stage级别和task级别的调度,stage级别的调度已经用和两片文章进行源码层面的说明,本文将从源码层面剖析task是如何被调度和执行的。
函数调用流程
先给出task调度的总体函数调用流程,并说明每个关键函数是干嘛的。这样一开始就在心里有个大概的流程图,便于之后的理解。
//& DAGScheduler调用该taskScheduler.submitTasks提交一个stage对应的taskSet,一个taskSet包含多个task
TaskSchedulerImpl.submitTasks(taskSet: TaskSet)
//& TaskScheduler(实际上是TaskSchedulerImpl)为DAGScheduler提交的每个taskSet创建一个对应的TaskSetManager对象,TaskSetManager用于调度同一个taskSet中的task
val manager = TaskSchedulerImpl.createTaskSetManager(taskSet, maxTaskFailures)
//& 将新创建的manager加入到调度树中,调度树由SchedulableBulider维护。有FIFO、Fair两种实现
SchedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//& 触发调用CoarseGrainedSchedulerBackend.reviveOffers(),它将通过发送事件触发makeOffers方法调用
CoarseGrainedSchedulerBackend.reviveOffers()
//& 此处为发送ReviveOffers事件
driverEndpoint.send(ReviveOffers)
//& 此处为接收事件并处理
CoarseGrainedSchedulerBackend.receive
CoarseGrainedSchedulerBackend.makeOffers
//& 查找各个节点空闲资源(这里是cores),并返回要在哪些节点上启动哪些tasks的对应关系,用Seq[Seq[TaskDescription]]表示
TaskSchedulerImpl.resourceOffers
//& 启动对应task
CoarseGrainedSchedulerBackend.launchTasks
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
看了上述流程可能不那么明白,没关系,不明白才要往下看。
TaskSchedulerImpl.submitTasks(…)
在Spark 1.3.1版本中,TaskSchedulerImpl是TaskScheduler的唯一实现。submitTasks函数主要作用如下源码及注释所示:
为taskSet创建对应的TaskSetManager对象。TaskManager的主要功能在于对Task的细粒度调度,比如
决定在某个executor上是否启动及启动哪个task
为了达到Locality aware,将Task的调度做相应的延迟
当一个Task失败的时候,在约定的失败次数之内时,将Task重新提交
处理拖后腿的task
调用SchedulerBackend.makeOffers进入下一步
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not ac " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
this.cancel()
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
hasReceivedTask = true
backend.reviveOffers()
基于事件模型的调用
下面源码及注释展示了CoarseGrainedSchedulerBackend是如何通过事件模型来进一步调用的。其中ReviveOffers事件有两种触发模式:
周期性触发的,默认1秒一次
reviveOffers被TaskSchedulerImpl.reviveOffers()调用
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =&
makeOffers()
CoarseGrainedSchedulerBackend.makeOffers()
该函数非常重要,它将集群的资源以Offer的方式发给上层的TaskSchedulerImpl。TaskSchedulerImpl调用scheduler.resourceOffers获得要被执行的Seq[TaskDescription],然后将得到的Seq[TaskDescription]交给CoarseGrainedSchedulerBackend分发到各个executor上执行
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =& new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }
为便于理解makeOffers调用及间接调用的各个流程,将该函数实现分为三个step来分析,这需要对源码的表现形式做一点点改动,但并不会有任何影响。
Step1: val seq = executorDataMap.map { case (id, executorData) =& new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq
executorDataMap是HashMap[String, ExecutorData]类型,在该HashMap中key为executor id,value为ExecutorData类型(包含executor的host,RPC信息,TotalCores,FreeCores信息)
private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)
这段代码,返回HashMap[executorId, WorkerOffer]。每个WorkerOffer包含executor的id,host及其上可用cores信息。
Step2: val taskDescs = scheduler.resourceOffers( seq )
拿到集群里的executor及其对应WorkerOffer后,就要开始第二个步骤,即找出要在哪些Worker上启动哪些task。这个过程比较长,也比较复杂。让我来一层层拨开迷雾。
我把val taskDescs = scheduler.resourceOffers( seq )即TaskSchedulerImpl.resourceOffers(offers: Seq[WorkerOffer]),返回的是Seq[Seq[TaskDescription]] 类型,来看看其实现:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
var newExecAvail = false
val shuffledOffers = Random.shuffle(offers)
val tasks = shuffledOffers.map(o =& new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o =& o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet &- sortedTaskSets) {
if (newExecAvail) {
taskSet.executorAdded()
var launchedTask = false
for (taskSet &- sortedTaskS maxLocality &- taskSet.myLocalityLevels) {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
if (tasks.size & 0) {
hasLaunchedTask = true
return tasks
结合代码,概括起来说,Step2又可以分为4个SubStep:
【SubStep1】: executor, host, rack等信息更新
【SubStep2】: 随机打乱workers。目的是为了分配tasks能负载均衡,分配tasks时,是从打乱的workers的序列的0下标开始判断是否能在worker上启动task的
【SubStep3】: RootPool对它包含的所有的TaskSetManagers进行排序并返回已排序的TaskSetManager数组。这里涉及到RootPool概念及如何排序,将会在下文展开说明
【SubStep4】: 对于RootPool返回的排序后的ArrayBuffer[TaskSetManager]中的每一个TaskSetManager,取出其包含的tasks包含的所有locality。根据locality从高到低,对于每个locality,遍历所有worker,结合延迟调度机制,判断TaskSetManager的哪些tasks可以在哪些workers上启动。这里比较需要进一步说明的是“延迟调度机制”及如何判断某个TaskSetManager里的tasks是否有可以在某个worker上启动
下面,就对SubStep3及SubStep4进行展开说明
【SubStep3】
SubStep3的职责是”RootPool对它包含的所有的TaskSetManagers进行排序并返回已排序的TaskSetManager数组”。那么什么是RootPool呢?每个Spark Application包含唯一一个TaskScheduler对象,该TaskScheduler对象包含唯一一个RootPool,Spark Application包含的所有Job的所有stage对应的所有未完成的TaskSetManager都会保存在RootPool中,完成后从RootPool中remove。RootPool为org.apache.spark.scheduler.Pool类型,称作调度池。Pool的概念与YARN中队列的概念比较类似,一个队列可以包含子队列,相对的一个Pool可以包含子Pool;YARN队列的叶子节点即提交到该队列的Application,Pool的叶子节点即分配到该Pool的TaskSetManager。Pool根据调度模式的不同,分为FIFO及Fair。FIFO模式下只有一层Pool,不同于YARN的队列可以n多层,Pool的Fair调度模式下,只能有三层:RootPool,RootPool的子Pools,子Pools的叶子节点(即TaskSetManager)。
不同的调度模式添加叶子节点的实现是一样的,如下:
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
Schedulable类型的参数schedulable包含成员val parent: Pool,即父Pool,所以在添加TaskSetManager到Pool的时候就指定了父Pool。对于FIFO,所有的TaskSetManager的父Pool都是RootPool;对于Fair,TaskSetManager的父Pool即RootPool的某个子Pool。
不同的模式,除了Pool的层级结构不同,对它包含的TaskSetManagers进行排序时使用的算法也不同。FIFO对应FIFOSchedulingAlgorithm类,Fair对应FairSchedulingAlgorithm()类
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =&
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =&
new FIFOSchedulingAlgorithm()
当Pool.getSortedTaskSetQueue被调用时,就会用到该排序类,如下:
//& 利用排序算法taskSetSchedulingAlgorithm先对以本pool作为父pool的子pools做排序,再对排序后的pool中的每个TaskSetManager排序;
//& 得到最终排好序的 ArrayBuffer[TaskSetManager]
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.toSeq.parator)
//& FIFO不会调到这里,直接走到下面的return
for (schedulable &- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
sortedTaskSetQueue
FIFO排序类中的比较函数的实现很简单:
1. Schedulable A和Schedulable B的优先级,优先级值越小,优先级越高
2. A优先级与B优先级相同,若A对应stage id越小,优先级越高
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
if (res & 0) {
Pool及TaskSetManager都继承于Schedulable,来看下它的定义:
private[spark] trait Schedulable {
var parent: Pool
// child queues
def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int
def priority: Int
def stageId: Int
def name: String
//& 省略若干代码
可以看到,Schedulable包含weight(权重)、priority(优先级)、minShare(最小共享量)等属性。其中:
* weight:权重,默认是1,设置为2的话,就会比其他调度池获得2x多的资源,如果设置为-1000,该调度池一有任务就会马上运行
* minShare:最小共享核心数,默认是0,在权重相同的情况下,minShare大的,可以获得更多的资源
对于Fair调度模式下的比较,实现如下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 & minShare1
val s2Needy = runningTasks2 & minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare:Int = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = pareTo(minShareRatio2)
compare = pareTo(taskToWeightRatio2)
if (compare & 0) {
} else if (compare & 0) {
s1.name & s2.name
结合以上代码,我们可以比较容易看出Fair调度模式的比较逻辑:
1. 正在运行的task个数小于最小共享核心数的要比不小于的优先级高
2. 若两者正在运行的task个数都小于最小共享核心数,则比较minShare使用率的值,即runningTasks.toDouble / math.max(minShare, 1.0).toDouble,越小则优先级越高
3. 若minShare使用率相同,则比较权重使用率,即runningTasks.toDouble / s.weight.toDouble,越小则优先级越高
4. 如果权重使用率还相同,则比较两者的名字
对于Fair调度模式,需要先对RootPool的各个子Pool进行排序,再对子Pool中的TaskSetManagers进行排序,使用的算法都是FairSchedulingAlgorithm.FairSchedulingAlgorithm。
到这里,应该说清楚了整个SubStep3的流程。
【SubStep4】
SubStep4说白了就是已经知道了哪些worker上由多少可用cores了,然后要决定要在哪些worker上启动哪些tasks:
//上launch的 List[workerId, ArrayBuffer[TaskDescription]]。 //& 由于task要使用的cores并不一定为1,所以每个worker上要launch得task并不一定等于可用的cores数 val tasks = shuffledOffers.map(o =& new ArrayBuffer[TaskDescription](o.cores))
var launchedTask = false
for (taskSet
sortedTaskSets; maxLocality &- taskSet.myLocalityLevels) { do { //& 获取tasks,tasks代表要在哪些worker上启动哪些tasks launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) }
从for循环可以看到,该过程对排好序的taskSet数组的每一个元素,从locality优先级从高到低(taskSet.myLocalityLevels返回该taskSet包含的所有task包含的locality,按locality从高到低排列,PROCESS_LOCAL最高)取出locality,以取出的taskSet和locality调用TaskSchedulerImpl.resourceOfferSingleTaskSet,来看下它的实现(为方便阅读及理解,删去一些代码):
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
//& 获取每个worker上要执行的tasks序列
for (i &- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) &= CPUS_PER_TASK) {
for (task &- taskSet.resourceOffer(execId, host, maxLocality)) {
//& 将获得要在index为i的worker上执行的task,添加到tasks(i)中;这样就知道了要在哪个worker上执行哪些tasks了
tasks(i) += task
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) &= 0)
launchedTask = true
case e: TaskNotSerializableException =&
return launchedTask
return launchedTask
resourceOfferSingleTaskSet拿到worker可用cores,taskSet和locality后
1. 遍历每个worker的可用cores,如果可用cores大于task需要的cores数(即CPUS_PER_TASK),进入2
2. 调用taskSet.resourceOffer(execId, host, maxLocality)获取可在指定executor上启动的task,若返回非空,把返回的task加到最终的tasks: Seq[ArrayBuffer[TaskDescription]]中,该结构保存要在哪些worker上启动哪些tasks
3. 减少2中分配了task的worker的可用cores及更新其他信息
从以上的分析中可以看出,要在某个executor上启动哪个task最终的实现在TaskSetManager.resourceOffer中,由于该函数比较长,我将函数分过几个过程来分析
首先来看第一段:
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality & maxLocality) {
allowedLocality = maxLocality
要判断task能否在worker上启动,除了空闲资源是否达到task要求外,还需要判断本地性,即locality。locality从高到低共分为PROCESS_LOCAL, NODE_LOCAL,RACK_LOCAL及ANY。若taskSet带有locality属性,则通过getAllowedLocalityLevel函数获得该taskSet能容忍的最低界别locality。
getAllowedLocalityLevel中:
1. 如果taskset刚刚被提交,taskScheduler开始第一轮对taskset中的task开始提交,那么当时currentLocalityIndex为0,直接返回可用的最好的本地性;如果是在以后的提交过程中,那么如果当前的等待时间超过了一个级别,就向后跳一个级别
2. getAllowedLocalityLevel方法返回的是当前这次调度中,能够容忍的最差的本地性级别,在后续步骤的搜索中就只搜索本地性比这个级别好的情况
3. 随着时间的推移,撇开maxLocality配置不谈,对于本地性的容忍程度越来越大。
继续返回TaskSetManager.resourceOffer中,获得taskSet能容忍的最差locality后,与maxLocality比较去较差的locality作为最终的
能容忍的最差locality。
进入第二段:
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) =& {
addRunningTask(taskId)
val taskName = s"task ${info.id} in stage ${taskSet.id}"
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
可以看到,第二段首先调用了函数dequeueTask,如果返回不为空,说明为指定的worker分配了task;这之后,进行各种信息更新,将taskId加入到runningTask中,并通知DAGScheduler,最后返回taskDescription。来看看dequeueTask的实现:
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
for (index &- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index &- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
for (index &- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
rack &- sched.getRackForHost(host)
index &- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
return Some((index, TaskLocality.RACK_LOCAL, false))
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index &- dequeueTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) =& (taskIndex, allowedLocality, true)}
从该实现可以看出,不管之前获得的能容忍的最差locality(即allowedLocality)有多低,每次dequeueTask都是以PROCESS_LOCAL-&…-&allowedLocality顺序来判断是否可以以该locality启动task,而并不是必须以allowedLocality启动task。这也增大了启动task的机会。
到这里应该大致说清楚了Step2中的各个流程。
Step3: launchTasks( taskDescs )
得到要在哪些worker上启动哪些task后,将调用launchTasks来启动各个task,实现如下:
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task &- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
//& 序列化task
val serializedTask = ser.serialize(task)
//& 若序列化后的task的size大于等于Akka可用空间
if (serializedTask.limit &= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =&
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes)
//& 中止taskSet,标记为已完成;同时将该taskSet的状态置为isZombie(Zombie:僵尸)
taskSet.abort(msg)
case e: Exception =& logError("Exception in error callback", e)
//& 若序列化后的task的size小于Akka可用空间,减去对应executor上的可用cores数并向对应的executor发送启动task消息
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
逻辑比较简单,先对task进行序列化,若序列化后的task的size大于等于akka可用空间大小,则taskSet标记为已完成并置为Zombie状态;若序列化后的task的size小于akka可用空间大小,则通过发送消息给对应executor启动task
版权声明:本文为博主原创文章,转载请注明出处
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:(#换为@))}

我要回帖

更多关于 spark tasksetmanager 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信