Deep Dive into the Spark Scheduler

Disclaimer: Most of the content is based on Xingbo Jiang's talk in Spark Summit 2018.

Overview:

Scheduling Process(Credit: Xingbo Jiang)

RDD Objects:

In this phase, RDDs will be translated into stages. For example:
Credit: Xingbo Jiang

DAGScheduler:

  • Implement stage-oriented scheduling
    • Compute a DAG of stages for submitted jobs
    • Keep track of materialized RDD/Stage outputs
    • Find a minimal schedule to run the job
  • Stage -> TaskSet
    • TaskSet is a set of tasks submitted to computing missing partitions of a particular stage
    • A stage can correspond to multiple TaskSets

TaskScheduler:

  • DAGScheduler submit set of tasks to TaskScheduler
  • Schedule and monitor tasks with SchedulerBackend.
  • Return events to DAGScheduler
    • JobSubmitted/JobCancelled
    • MapStageSubmitted/StageCancelled
    • CompletionEvent
  • How to schedule tasks(TaskSets)?
    • Batch scheduling approach
      • Get all available slots
      • Schedule tasks with locality preference
      • Wait until all task in the same TaskSet can be scheduled at the same time
      • Retry all tasks in the TaskSet if any task fails
  • How to schedule tasks in a TaskSet?
    • Try to achieve better locality for each task(This locality-aware scheduling is implemented via delay scheduling)
      • Less data transfer over network
      • Higher performance
    • Locality can have different levels
      • Process Locality(Cache/Memory)
      • Node Locality(Local Disk)
      • Rack Locality(Same Rack)
  • SchedulingBackend is responsible for resource management
Illustration of Delay Scheduling(Credit:Xingbo Jiang)
1
// Delay Scheduling in Psedo-code
2
When a heartbeat is received from node n:
3
if n has a free slot then:
4
compute maxAllowedLocality for pending tasks
5
if exists task t can launch on n with locality <= maxAllowedLocality:
6
launch t
7
update currentLocality
8
else if waitTime > maxDelayTime:
9
launch t
10
else:
11
// Wait for next round of scheduling
12
endif
Copied!

Handling Failures:

  • Task Failure
    • Record the failure count of the task
    • Retry the task if failure count < maxTaskFailures
    • Abort the stage and corresponding jobs if count >= maxTaskFailures
  • Fetch Failure
    • Don't count the failure into task failure count
    • Retry the stage if stage failure < maxStageFailures
    • Abort the stage and corresponding jobs if stage failure <= maxStageFailures
    • Mark executor/host as lost(optional)

Worker

  • A container for executors
  • Executors provide
    • Threads to run Tasks
    • BlockManager to store/serve blocks