# Deep Dive into the Spark Scheduler

#### Disclaimer: Most of the content is based on Xingbo Jiang's [talk](https://databricks.com/session/apache-spark-scheduler) in Spark Summit 2018.

### Overview:

![Scheduling Process(Credit: Xingbo Jiang)](https://1271107977-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LkwjM9iFxyVX0PINZ87%2F-Lx8hCv7KRB1r1BpgD95%2F-Lx8iC7EE04C_tG8g5yY%2FScreen%20Shot%202019-12-27%20at%203.36.41%20PM.png?alt=media\&token=1019c2b0-522d-47da-a1a2-630e8de267a5)

### RDD Objects:

In this phase, RDDs will be translated into stages. For example:&#x20;

![Credit: Xingbo Jiang](https://1271107977-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LkwjM9iFxyVX0PINZ87%2F-Lx8hCv7KRB1r1BpgD95%2F-Lx8ikdqg-SkFXEeYUml%2FScreen%20Shot%202019-12-27%20at%203.39.35%20PM.png?alt=media\&token=7375af86-4756-4ef0-8ebd-6dd301ac1fa4)

### DAGScheduler:

* Implement stage-oriented scheduling
  * Compute a DAG of stages for submitted jobs
  * Keep track of materialized RDD/Stage outputs
  * Find a minimal schedule to run the job
* Stage -> TaskSet
  * TaskSet is a set of tasks submitted to computing missing partitions of a particular stage
  * A stage can correspond to multiple TaskSets

### TaskScheduler:

* DAGScheduler submit set of tasks to TaskScheduler&#x20;
* Schedule and monitor tasks with SchedulerBackend.&#x20;
* Return events to DAGScheduler
  * JobSubmitted/JobCancelled
  * MapStageSubmitted/StageCancelled
  * CompletionEvent
* How to schedule tasks(TaskSets)?
  * Batch scheduling approach
    * Get all available slots
    * Schedule tasks with locality preference
  * [Barrier Scheduling Approach ](https://issues.apache.org/jira/browse/SPARK-24375)
    * Wait until all task in the same TaskSet can be scheduled at the same time
    * Retry all tasks in the TaskSet if any task fails
* How to schedule tasks in a TaskSet?
  * Try to achieve better locality for each task(This locality-aware scheduling is implemented via [delay scheduling](https://cs.stanford.edu/~matei/papers/2010/eurosys_delay_scheduling.pdf))
    * Less data transfer over network
    * Higher performance
  * Locality can have different levels
    * Process Locality(Cache/Memory)
    * Node Locality(Local Disk)
    * Rack Locality(Same Rack)
* SchedulingBackend is responsible for resource management&#x20;

![Illustration of Delay Scheduling(Credit:Xingbo Jiang)](https://1271107977-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LkwjM9iFxyVX0PINZ87%2F-Lx8hCv7KRB1r1BpgD95%2F-Lx8m4aaQWIwVyuiY_i_%2FScreen%20Shot%202019-12-27%20at%203.54.15%20PM.png?alt=media\&token=428c0bcd-e783-48f8-8e11-611400a66f02)

```
// Delay Scheduling in Psedo-code
When a heartbeat is received from node n:
    if n has a free slot then:
        compute maxAllowedLocality for pending tasks
        if exists task t can launch on n with locality <= maxAllowedLocality:
            launch t
            update currentLocality
        else if waitTime > maxDelayTime:
            launch t
        else:
        // Wait for next round of scheduling 
    endif 
```

### Handling Failures:

* Task Failure
  * Record the failure count of the task
  * Retry the task if failure count < maxTaskFailures
  * Abort the stage and corresponding  jobs if count >= maxTaskFailures
* Fetch Failure
  * Don't count the failure into task failure count
  * Retry the stage if stage failure < maxStageFailures
  * Abort the stage and corresponding jobs if stage failure <= maxStageFailures
  * Mark executor/host as lost(optional)

### Worker

* A container for executors&#x20;
* Executors provide
  * Threads to run Tasks&#x20;
  * BlockManager to store/serve blocks
