SparkContext

When implementing Spark Job, the entry point is to create SparkSession and SparkContext. SparkContext is the entry point to broadcast variables, create RDDs. There are several ways to create RDDs from SparkContext:

  • parallelize: create a RDD from a Scala Seq
  • range: create a RDD based on a start and end with specified step
  • textFile: create a RDD from a text file
  • wholeTextFiles: create a RDD from all files in a directory
  • hadoopRDD: create a RDD from Hadoop JobConf with InputFormat / Key class / Value class

Another important feature provided by SparkContext is declare accumulators. Accumulators are very useful when we want to make some counters and metrics for Spark Jobs.

After all data transformations have been done, it is the right time to launch Spark Jobs. This is done by runJob in SparkContext.

runJob

If we only define transformations on RDDs, like map, flatMap, Spark Jobs actually will be executed. The following actions will trigger job running:

  • reduce: apply a reduce function over data in each partition
  • fold
  • aggregate
  • count

  • take

  • collect

  • foreach / foreachPartition

Once those actions are called on RDDs, DAGScheduler in SparkContext will run Spark Jobs.

DAG Scheduler

When DAG Scheduler runs a Spark Job, it will take RDD and functions triggering to run Spark Job, submitting the job into an EventLoop and returning a JobWaiter. Then DAG Scheduler will wait on the JobWaiter until the future is set, either the job succeeds or fails. For batch process, once all RDDs finish, the whole job also finishes. Spark Streaming Job is different from this, which will try to generate little batch jobs again and again.

Once the job is pushed into EventLoop, an EventThread will be responsible for executing jobs in the event queue. Current, there is one implementation for EventLoop - DAGSchedulerEventProcessLoop. EventThread will take each event, like Spark Job, from event queue. Event queue is a linked blocking queue with unlimited size in EventLoop. EventThread checks even type. There are several event types:

  • JobSubmitted - run job action event
  • MapStageSubmitted
  • StageCancelled
  • JobCancelled
  • ExecutorAdded
  • ExecutorLost
  • TODO - Add and explain all events later

For job submission, EventThread will invoke handleJobSubmitted in DAGScheduler. A ResultStage and its parent stages will be created based on dependent RDDs and data shuffling. In each stage, there is no data shuffling among RDDs. Data between different stages will be shuffled, may using network to transfer data between two executors.

An ActiveJob is created to represent the submitted job. At the same time, job submitted event is posted to LiveListenerBus, notifying registered Listeners including job progress listener, spark UI listener, etc. In the end, it is fine to submit all tasks for current job. Each jobs consists of several stages, and each stage includes several tasks. While submitting a job, Spark tries to find the parent stage of current stage until there is no parent stage, then begin submitting tasks for all stages from parent to children stages. E.g. submitting stage to read data from source, doing some transformations, then submitting next dependent shuffle stage, until to the last stage carrying out actions, like take().

Before submitting tasks of current stage, its partition will be checked to see whether it has been processed. For example, for cached RDD, the scheduler can avoid submitting tasks generating the cached partitions in this RDD. Locality is considered while generating each Task. Task class represents transformations on each partition in RDD. At last, those tasks are submitted to TaskScheduler.

TaskScheduler

Here is the quote from the comment for TaskScheduler in Spark source code: Each TaskScheduler schedules tasks for a single SparkContext. These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers. They return events to the DAGScheduler.

DAGScheduler decides which stage should run first. Not-ready tasks, e.g. not executed or not cached, in each RDD are added into SchedulableBuilder to decide the way to run tasks. There two builders: FIFOSchedulableBuilder and FairSchedulableBuilder. FIFO builder is quite simple - placing tasks into the root pool in TaskScheduler based on the order of all stages. For example, source tasks enter the Pool first, then other tasks in the stage right after the source stage. FAIR builder is much more complicated than FIFO. It will be discussed later.

Each element in the Pool is inherited from Schedulable interface, which contains a priority field. Priority will be used by FIFO comparator or FAIR comparator to decide which TaskSet could run first by sorting all the elements in the Pool. A TaskSet actually represents all tasks in one RDD.

So far so good. All tasks are in the Pool, then they are ready to run. This is done by SchedulerBackend.

SchedulerBackend

SchedulerBackend acts as a role to provide computing resource for Spark Jobs. There are several implementations, for important ones, they are:

  • CoarseGrainedSchedulerBackend
  • StandaloneSchedulerBackend
  • LocalSchedulerBackend

SchedulerBackend lives with Driver. It receives update info from Executors, saving executors in internal map, tracking executors' status. All executors will update Register message to SchedulerBackend while it starts. For TaskScheduler, on submitting tasks, after they have been placed in the Pool, TaskScheduler will also notify SchedulerBackend that I want some resource to run tasks. Then SchedulerBackend search its internal executors through makeOffers call, and offer those executors back to TaskScheduler by call resourceOffers. TaskScheduler will decide which task to run based on locality setting and other issues, returning a TaskDesc back to SchedulerBackend. Then, SchedulerBackend will launch those tasks.

Launch Task

First of all, Spark will check the size of serialized RDD. If it exceeds the max limit, the job will be cancelled. Cancellation takes following actions:

  1. Invoke TaskSetManager.abort()
  2. DAGScheduler will fail this stage and stages depending on this stage.
  3. TaskScheduler will tell SchedulerBackend to send KillTask message to Executors.

If the size is OK, a LaunchTask message will be sent to Executor to run the task.

results matching ""

    No results matching ""