spark的运行原理在大数据开发岗面试过程中是经常被问到的一个问题,我第一次被问到这个问题的时候有点摸不着头脑,这么大的一个问题我究竟应该怎样回答呢?是去描述一下spark的架构组成还是说一下底层的调用细节?后来查找了一些资料,看了一些书之后对这个问题有了一些理解,其实提这个问题的人可能最希望我们回答的是Spark运行的过程细节,简单来说就是把某个Spark程序从提交到执行完成中间经历了哪些步骤描述出来。如果在描述的过程中能够加入一些对Spark底层源码细节的解释会给提问者留下比较好的印象,认为你不仅仅是停留在使用Spark上,还对底层源码的原理有所了解。
简单描述Spark的运行原理
用户使用spark-submit提交一个作业之后,会首先启动一个driver进程,driver进程会向集群管理器(standalone、YARN、Mesos)申请本次运行所需要的资源(这里的资源包括core和memory,可以在spark-submit的参数中进行设置),集群管理器会根据我们需要的参数在各个节点上启动executor。申请到对应资源之后,driver进程就会开始调度和执行我们编写的作业代码。作业会被提交给DAGScheduler,DAGScheduler会根据作业中RDD的依赖关系将作业拆分成多个stage,拆分的原则就是根据是否出现了宽依赖,每个stage当中都会尽可能多的包含连续的窄依赖。每个stage都包含了作业的一部分,会生成一个TaskSet提交给底层调度器TaskScheduler,TaskScheduler会把TaskSet提交到集群当中由executor进行执行。Task的划分是根据数据的partition进行划分,一个partition会划分为一个task。如此循环往复,直至执行完编写的driver程序的所有代码逻辑,并且计算完所有的数据。
简单的运行流程如下图:
图一 spark运行流程
SparkContext
Spark程序的整个运行过程都是围绕spark driver程序展开的,spark driver程序当中最重要的一个部分就是SparkContext,SparkContext的初始化是为了准备Spark应用程序的运行环境,SparkContext主要是负责与集群进行通信、向集群管理器申请资源、任务的分配和监控等。
driver与worker之间的架构如下图,driver负责向worker分发任务,worker将处理好的结果返回给driver。
图二 driver架构
SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器DAGScheduler、底层调度器TaskScheduler和调度器的通信终端SchedulerBackend,同时还会负责Spark程序向Master注册程序等。Spark应用当中的RDD是由SparkContext进行创建的,例如通过SparkContext.textFile()、SparkContext.parallel()等这些API。运行流程当中提及的向集群管理器Cluster Manager申请计算资源也是由SparkContext产生的对象来申请的。接下来我们从源码的角度学习一下SparkContext,关于SparkContext创建的各种组件,在SparkContext类中有这样一段代码来创建这些组件:
### DAGScheduler DAGScheduler是一个高层调度器,能够将DAG的各个RDD划分到不同的Stage,并构建这些Stage之间的父子关系,最后将每个Stage根据partition划分为多个Task,并以TaskSet的形式提交给底层调度器TaskScheduler。Stage的划分按照的是RDD依赖关系中是否出现了宽依赖,宽依赖指的是父RDD中的一个partition被子RDD的多个partition所依赖,简单来说就是父RDD的partition的出度大于1,同理,窄依赖指的就是父RDD的一个partition只被子RDD的一个partition所依赖,也就是父RDD的partition的出度都是1。每个Stage当中都会尽可能包含多的窄依赖,将各个窄依赖的算子形成一整个pipeline进行运行,可以减少各个算子之间RDD的读写,不像MapReduce当中每个job只包含一个Map任务和一个Reduce任务,下一个Map任务都需要等待上一个Reduce任务全部都结束才能执行,pipeline形式的执行过程中没有产生shuffle,放在一起执行明显效率更高。Stage与Stage之间会出现shuffle,这里shuffle也是一个常常考察的点,另外的文章会详细说明。DAGScheduler还需要记录哪些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度,如在Stage内部数据的本地性等。DAGScheduler还需要监控因为shuffle跨节点输出可能导致的失败,如果发现这个Stage失败,可能就要重新提交该Stage。DAGScheduler具体调用过程
当一个job被提交时,DAGScheduler便会开始其工作,spark中job的提交是由RDD的action触发的,当发生action时,RDD中的action方法会调用其SparkContext的runJob方法,经过多次重载之后会调用到DAGScheduler的runJob方法。
DAGScheduler类当中runJob是提交job的入口函数,其中会调用submitJob方法返回一个JobWaiter来等待作业调度的结果,之后根据作业的成功或者失败打印相关的结果日志信息。
submitJob方法会获取jobId以及校验partitions是否存在,并向eventProcessLoop发送了一个case class JobSubmitted对象,JobSubmitted对象封装了jobId、最后一个RDD,对RDD操作的函数,哪些partition需要被计算等内容。eventProcessLoop当中有一个eventThread线程,是一个deamon线程,用于接收通过post方法发送到该线程的JobSubmitted对象,放入其中的一个eventQueue阻塞队列进行处理,从eventQueue中take出来的event会调用onReceive方法(该方法由eventProcessLoop实现),onReceive方法中又会调用doOnReceive方法,按照不同的event类型进行不同的处理。
这里读源码的时候可能会有一个疑问,为何不直接在DAGScheduler调用submitJob的时候直接调用doOnReceive来处理job,为何要新启一个线程来进行处理,并且自己给自己发消息进行处理(eventProcessLoop是DAGScheduler内部的一个对象)。这里实际上是一个线程的异步通信方式,只是将消息以线程通信的方式post(这里的线程通信方式实际上是用了一个阻塞队列)给另一个线程,submitJob的方法能够立刻返回,不会阻塞在处理event的过程当中。这里我们不要浅显的认为DAGScheduler当中自己在给自己发消息,实际上还有别的组件会给DAGScheduler发消息,这种采用一个守护线程的方式进行消息处理可以将这两者统一起来,两者处理的逻辑都是一致的,扩展性非常好,使用消息循环器,就能统一处理所有的消息,保证处理的业务逻辑都是一致的。这里的eventProcessLoop实际上能够处理多种消息,不仅仅是JobSubmitted,源码当中能看到有如下多种event的处理:
- JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
- MapStageSubmitted(jobId, dependency, callSite, listener, properties)
- StageCancelled(stageId, reason)
- JobCancelled(jobId, reason)
- JobGroupCancelled(groupId)
- AllJobsCancelled
- ExecutorAdded(execId, host)
- ExecutorLost(execId, reason)
- WorkerRemoved(workerId, host, message)
- BeginEvent(task, taskInfo)
- SpeculativeTaskSubmitted(task)
- GettingResultEvent(taskInfo)
- completion
- TaskSetFailed(taskSet, reason, exception)
- ResubmitFailedStages
JobSubmitted会去调用DAGScheduler中的handleJobSubmitted方法,该方法是构建Stage的开始阶段,会创建Stage中的最后一个Stage——ResultStage,而其他Stage为ShuffleMapStage。ResultStage的创建是由createResultStage这个函数完成的,其中的getOrCreateParentStage方法将会获取或创建一个给定RDD的父Stages列表,这个方法就是我们之前所说的具体划分Stage的方法,这个方法的源码很简单,如下:
其中调用了一个函数getShuffleDependencies用来返回给定RDD的父节点中直接的shuffle依赖,其源码如下:
这里有三个主要的数据结构,为两个HashSet——parents和visited,还有一个Stack——waitingForVisit,代码中首先将传入的RDD加入用于进行栈式访问的waitingForVisit中,这里使用栈我们也可以看出这是一个深度优先搜索的策略,visited用于记录访问过的节点保证不会重复访问,接下来对访问的RDD的依赖进行区分,如果是shuffleDep(即宽依赖),就将依赖加入parents,如果是dependency(窄依赖),则将依赖的RDD加入waitingForVisit进行深度优先搜索遍历,这里最终将返回parents,产生的结果就是parents当中记录的都是shuffleDep,即两个Stage之间的依赖。之后根据得到的shuffle dependency来调用getOrCreateShuffleMapStage产生ShuffleMapStage,产生的ShuffleMapStage会存储在shuffleIdToMapStage这个HashMap当中,如果在该数据结构中已经存在创建过的ShuffleMapStage就直接返回,不存在则调用createShuffleMapStage进行创建,创建的时候会调用getMissingAncestorShuffleDependencies去搜索祖先shuffle dependency,先将依赖的Stage进行创建。
Stage创建完毕之后,handleJobSubmitted将会调用submitStage来提交finalStage,submitStage将会递归优先提交父Stage,父Stage是通过getMissingParentStages来获取的,并按照Stage的id进行排序,优先提交id小的Stage。
具体例子说明
如下图所示是5个RDD的转换图,假设RDD E最后出发了一个action(比如collect),接下来按照图中的关系仔细讲解一下DAGScheduler对Stage的生成过程。
- RDD.collect方法会出发SparkContext.runJob方法,之后调用到DAGScheduler.runJob方法,继而调用submitJob方法将这个事件封装成JobSubmitted事件进行处理,调用到handleJobSubmitted,在这个方法中会调用createResultStage。
- createResultStage会基于jobId创建ResultStage(ResultStage中的rdd即是出发action的那个RDD,即finalRDD)。调用getOrCreateResultStages创建所有父Stage,返回parents: List[Stage]作为父Stage,将parents传入ResultStage,实例化生成ResultStage。在示意图中即是RDD E调用createResultStage,通过getOrCreateResultStages获取Stage1、Stage2,然后创建自己的Stage3。
- getOrCreateParentStages方法中的getShuffleDependencies会获取RDD E的所有直接款依赖集合RDD B和RDD D,然后对这两个RDD分别调用getOrCreateShuffleMapStage,由于这两个RDD都没有父Stage,则getMissingAncestorShuffleDependencies会返回为空,会创建这两个ShuffleMapStage,最后再将这两个Stage作为Stage3的父Stage,创建Stage3。
- 之后会调用handleJobSubmitted中的submitStage来提交Stage,提交的时候采用从后往前回溯的方式,优先提交前面的Stage,并且按照Stage的id优先提交Stage的id小的,后面的Stage依赖于前面的Stage,只有前面的Stage计算完毕才会去计算后面的Stage。
SchedulerBackend和TaskScheduler
之前讲到的TaskScheduler和SchedulerBackend都只是一个trait,TaskScheduler的具体实现类是TaskSchedulerImpl,而SchedulerBackend的子类包括有:
- LocalSchedulerBackend
- StandaloneSchedulerBackend
- CoarseGrainedSchedulerBackend
- MesosCoarseGrainedSchedulerBackend
- YarnSchedulerBackend
不同的SchedulerBackend对应不同的Spark运行模式。传给createTaskScheduler不同的master参数就会输出不同的SchedulerBackend,在这里spark实际上是根据master传入的字符串进行正则匹配来生成不同的SchedulerBackend。这里采用了设计模式当中的策略模式,根据不同的需要来创建不同的SchedulerBackend的子类,如果使用的是本地模式,就会创建LocalSchedulerBackend,而standalone集群模式则会创建StandaloneSchedulerBackend。StandaloneSchedulerBackend中有一个重要的方法start,首先会调用其父类的start方法,之后定义了一个Command对象command,其中有个对象成员mainClass为org.apache.spark.executor.CoarseGrainedExecutorBackend,这个类非常重要,我们在运行spark应用时会在worker节点上看到名称为CoarseGrainedExecutorBackend的JVM进程,这里的进程就可以理解为executor进程,master发指令给worker去启动executor所有的进程时加载的Main方法所在的入口类就是这个CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动executor,executor通过构建线程池来并发地执行task,然后再调用它的run方法。在start方法中还会创建一个十分重要的对象StandaloneAppClient,会调用它的start方法,在该方法中会创建一个ClientEndpoint对象,这是一个RpcEndPoint,会向Master注册。
SchedulerBackend实际上是由TaskScheduler来进行管理的,createTaskScheduler方法中都会调用TaskScheduler的initialize方法将SchedulerBackend作为参数输入,绑定两者的关系。
initialize方法当中还会创建一个Pool来初始定义资源的分布模式Scheduling Mode,默认是先进先出(FIFO)模式,还有一种支持的模式是公平(FAIR)模式。FIFO模式指的是任务谁先提交谁就先执行,后面的任务需要等待前面的任务执行,FAIR模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行的先后顺序。
TaskScheduler的核心任务是提交TaskSet到集群运算运算并汇报结果。我们知道,之前所讲的DAGScheduler会将任务划分成一系列的Stage,而每个Stage当中会封装一个TaskSet,这些TaskSet会按照先后顺序提交给底层调度器TaskScheduler进行执行。TaskScheduler接收的TaskSet是DAGScheduler中的submitMissingTasks方法传递过来的,具体调用的函数为TaskScheduler.submitTasks,TaskScheduler会为每一个TaskSet初始化一个TaskSetManager对其生命周期进行管理,当TaskScheduler得到Worker节点上的Executor计算资源的时候,TaskSetManager便会发送具体的Task到Executor上进行执行。如果Task执行的过程中出现失败的情况,TaskSetManager也会负责进行处理,会通知DAGScheduler结束当前的Task,并将失败的Task再次添加到待执行队列当中进行后续的再次计算,重试次数默认为4次,处理该逻辑的方法为TaskSetManager.handleFailedTask。Task执行完毕,TaskSetManager会将结果反馈给DAGScheduler进行后续处理。
TaskScheduler的具体实现类为TaskSchedulerImpl,其中有一个方法为submitTasks非常重要,该方法源码如下所示:
该方法中会创建TaskSetManager,并通过一个HashMap将stage的id和TaskSetManager进行对应管理。之后会调用SchedulableBuilder的addTaskSetManager方法将创建的TaskSetManager加入其中,SchedulableBuilder会确定TaskSetManager的调度顺序并确定每个Task具体运行在哪个ExecutorBackend中。submitTasks方法的最后会调用backend.reviveOffer,该backend具体类型一般为CoarseGrainedSchedulerBackend,是SchedulerBackend的一个子类,其reviveOffers方法中调用的是driverEndPoint.send方法,这个方法会给DriverEndPoint发送ReviveOffers消息,会触发底层资源调度,即进行TaskSet所需要资源的分配。 driverEndPoint的receive方法匹配到ReceiveOffers消息,就调用makeOffers方法,该方法如下所示:
该方法会获取活动的Executor,根据activeExecutor生成所有可用于计算的workOffers,workOffers在创建时会传入Executor的id,host和可用的core等信息,可用的内存信息在其他地方已经获取。makeOffers方法中还调用了scheduler的resourceOffers方法,这个方法便是给用于计算的workOffers提供资源,均匀的将任务分发给每个workOffer(Executor)进行计算。在这里我曾经有一个疑问,就是任务是不是按顺序发给每个Executor进行计算的,即假设有100个Task,5个Executor,分发任务的时候是不是总是按照0号Executor、1号Executor、2号Executor……这样的顺序进行分发的,也就是0号Executor总是拿到id为TaskId % 5的任务,1号Executor总是拿到id为TaskId % 5 + 1的任务。但阅读源码发现,其中有一个环节是进行shuffle操作,调用的是Random.shuffle(offers),即把workOffers(Executors)在Seq中的顺序进行洗牌,避免总是把任务放在同一组worker节点,这一点我们在后续的resourceOfferSingleTaskSet方法中可以很清楚的看到任务具体分发的过程其实就是按照workerOffers在Seq中的顺序进行的,在源码中就是对workerOffers的一个简单的for遍历进行读取可用的core资源并将可用的资源分发给TaskSetManager用于对应的TaskSet的计算:
源码中按照shuffledOffers的索引进行顺序遍历,因为之前已经进行过shuffle操作,workerOffer的顺序每次都是打乱的,所以在这里分配任务时不会总是按照一定的顺序给workerOffer分配对应id号的Task,而是会以随机乱序的方式给workerOffer分配Task,但是任务的分配还会考虑任务的本地性,在分配任务时会将对应的Executor资源输入给TaskSetManager的resourceOffer方法,该方法会返回需要计算的Task的TaskDescription,这里很重要的一个依据就是尽量给Executor分配计算本地性高的任务。数据的本地优先级从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,因此对于一些任务,其数据一直处于某个节点上,因而该任务也会一直分配给该节点上的Executor进行计算,之前对workerOffer进行打乱分配的效果可能看起来就不是特别明显了,会发现一些任务一直处于某些节点上进行计算。DAGScheduler也会有本地性的考虑,但是DAGScheduler是从数据的层面进行考虑的,从RDD的层面确定就可以,而TaskScheduler是从具体计算的角度考虑计算的本地性,是更具体的底层调度,满足数据本地性和计算本地性。
在resourceOfferSingleTaskSet方法中我们看到有一个变量CPUS_PER_TASK,之前我一直理解的是一个Task是由一个cpu core进行执行的,但是这个变量实际上来源于配置参数spark.task.cpus,当我们将这个参数设置为2时一个Task会被分配到2个core,从Stack Overflow当中了解到这个参数的设置实际上是为了满足一些特殊的Task的需求,有些Task内部可能会出现多线程的情况,或者启动额外的线程进行其他交互操作,这样设置能够确保对core资源的总需求在按照设置的情况运行时不会超过一定的设置值(但这里并没有强制要求,如果Task启动的线程数大于设置的spark.task.cpus也不会有问题,但可能会因为超过一定的值造成资源抢占,影响效率)。
进行资源分配的taskSet实际上是有一定的顺序的,在TaskSchedulerImpl.resourceOffers方法中调用了rootPool.getSortedTaskSetQueue获取按照一定规则排序后的taskSet进行遍历处理,这里的规则就是之前所说的FIFO或FAIR,指的是属于一个Stage的TaskSet的计算的优先级。resourceOffers函数一开始也会对每一个活着的slave进行标记,记录其主机名并跟踪是否增加了新的Executor,这里可能的情况是有一些Executor挂掉了重新启动了新的,需要在有新的TaskSet计算请求时加入到计算资源信息记录当中。
任务的资源分配好之后,会获得Task的TaskDescription,接下来CoarseGrainedSchedulerBackend调用launchTasks方法把任务发送给对应的ExecutorBackend进行执行。
如果任务序列化之后大小超过了maxRpcMessageSize(默认128M)会丢弃,否则根据TaskDescription中记录的执行该Task的executorId获取executorData,将其中的freeCores减去运行任务需要的core数,并使用executorEndPoint的send方法发送LaunchTask给指定的Executor的ExecutorBackend进行执行,LaunchTask是一个case class,其中存储的内容就是序列化的Task。
参考文献:
- Spark大数据商业实战三部曲/王家林,段智华,夏阳编著. 北京:清华大学出版社,2018
- https://stackoverflow.com/questions/36671832/number-of-cpus-per-task-in-spark