MapReduce Job Flow 4


Mapreduce Job Flow Through YARN Implementation

This post is to describe the mapreduce job flow – behind the scenes, when a job is submit to hadoop through submit() or waitForCompletion() method on Job object. This Mapreduce job flow is explained with the help of Word Count mapreduce program described in our previous post. Here the flow is described as per the YARN (Mapreduce2) implementation.

submit() method submits the job to the hadoop and waitForCompletion() method submits the job to hadoop only when it is not already submitted and it waits for the completion of submitted job.

In YARN implementation, the run mode of mapreduce job, can be set through mapreduce.framework.name property in yarn-site.xml. The valid values are local, classic and yarn. local mode will submit the jobs to local job runner and classic mode will submit the jobs through old Mapreduce framework which is usually called as Mapreduce1. YARN mode will submit the jobs through new Mapreduce framework (Mapreduce2). In this post, we will cover only YARN implementation specific flow.

Before going into Job Flow in YARN, lets have a basic overview on YARN.

YARN Overview

YARN stands for Yet Another Resource Negotiator which is implemented in Hadoop 0.23 release to overcome the scalability shortcomings of classic Mapreduce (Mapreduce 1).

In classic mapreduce framework, there are two major components Job Tracker and Task Tracker which work in Master-Slave fashion. The Job Tracker is responsible for allocating resources required to run a mapreduce job and scheduling activities. Task trackers are initiated by Job tracker to process individual tasks. Since the Job tracker is responsible for both resource management (assigning resources to each job) and job scheduling (assigning task to task trackers and monitoring task progress) in a single node, scalability is an issue in large HDFS clusters with more than 4000 nodes.

To overcome this scalability short come of classic mapreduce and provide the ability to run mapreduce jobs efficiently in large HDFS clusters, YARN or MapReduce 2 or MRv2 is implemented in hadoop-0.23 release in 2010.

The main idea of YARN implementation is to split the responsibilities of the job tracker into two separate daemons, The Resource Manager  which is responsible for managing the resources across the cluster and a per-application Application Master. An Application is a single job in the classical sense of Map-Reduce jobs.

Task Tracker is replaced with Node Manager in YARN which is a per-machine framework agent and it is responsible for containers, monitoring their resource usage (CPU, memory, disk, network) and reporting the same to the Resource Manager. Application Master negotiates with Resource Manager to get the resources across cluster and work with the Node Managers to execute and monitor the tasks.

Mapreduce Job Flow

Components of Mapreduce Job Flow:

Mapreduce job flow on YARN involves below components.

  • A Client node, which submits the Mapreduce job.
  • The YARN Resource Manager, which allocates the cluster resources to jobs.
  • The YARN Node Managers, which launch and monitor the tasks of jobs.
  • The MapReduce Application Master, which coordinates the tasks running in the MapReduce job. The application master and the MapReduce tasks run in containers that are scheduled by the resource manager, and managed by the node managers.
  • The HDFS file system  is used for sharing job files between the above entities.
Job Start up: 
  1. The call to Job.waitForCompletion() in the main driver class is where all the execution starts. The driver is the only piece of code that runs on our local machine, and this call starts the communication with the Resource Manager.
  2. Retrieves the new Job ID or Application ID from Resource Manager.
  3. The Client Node copies Job Resources specified via the -files, -archives, and -libjars command-line arguments, as well as the job JAR file on to HDFS.
  4. Finally, Job is submitted by calling submitApplication() method on Resource Manager.
  5. Resource Manager triggers its sub-component Scheduler, which allocates containers for mapreduce job execution. Then Resource Manager starts Application Master in the container provided by the scheduler. This container will be managed by Node Manager from here on wards.
Input Split:
  • In this phase, HDFS splits the input files into equal sized chunks or segments based on minimum split size (mapreduce.input.fileinputformat.split.minsize) property .
  • Each file segment or split is passed to a unique map task if file is splittable. If File is not splittable then entire file will be provided as input to a single map task.
  • These map tasks are created by Mapreduce Application Master (MRAppMaster Java Class) and reduce tasks are also created by application master based on mapreduce.job.reduces property.
Role of an Application Master:
  • Before starting any task, Job setup method is called to create job’s output directory for job’s OutputCommitter.
  • As noted above, Both map tasks and reduce tasks are created by Application Master.
  • If the submitted job is small, then Application Master runs the job in the same JVM on which Application Master is running. It reduces the overhead of creating new container and running tasks in parallel. These small jobs are called as Uber tasks.
  • Uber tasks are decided by three configuration parameters, number of mappers <= 10, number of reducers <= 1 and Input file size is less than or equal to an HDFS block size. These parameters can be configured via mapreduce.job.ubertask.maxmaps , mapreduce.job.ubertask.maxreduces , and mapreduce.job.ubertask.maxbytes  properties in mapred-site.xml.
  • If job doesn’t qualify as Uber task, Application Master requests containers for all map tasks and reduce tasks.
Task Execution:
  •  Once Containers assigned to tasks, Application Master starts containers by notifying its Node Manager.
  • Node Manager copies Job resources (like job JAR file) from HDFS distributed cache and runs map or reduce tasks.
  • Running Tasks, keep reporting about the progress and status (Including counters) of current task to Application Master and Application Master collects this progress information from all tasks and aggregate values are propagated to Client Node or user.
WordcountMapper Execution:

In our Word Count Example, Mapper Input is just a 1 line text file.

“this is sample file for word count map jobthis job takes this file as input and returns word counts .”
In our WordcountMapper, line number is treated as Key which is “1” in our case as it has only one line and Value is the contents of the line text.
In our WordcountMapper implementation, the map method discards the key as we do not need where each line occurred in the file and splits the provided value into words using the split method on the standard Java String class.
Finally the mapper writes (key , value) pairs for each input word in the format where key comprised of actual word itself and value as 1.
The mapper output or reducer input looks as shown below.
(this, 1)  (is, 1)  (a, 1)  (sample, 1)  (file, 1) (for, 1) (word, 1) ……. (returns, 1) (word, 1) (counts,1) (., 1)
WordcountReducer Execution:

Reducer Task receives updates from Scheduler that which nodes on the cluster hold map output. Reducer retrieves these map outputs from the various nodes and merges them into a single file that will be fed to the reduce task.

In our WordcountReducer implementation, for each word reduce() method simply counts the number of elements in the array and emits the final (Word, count) output for each word.

So, the final output of the reducer is as follows:

.1
a1
and1
as1
count1
counts1
file2
for1
input1
is1
job1
job.1
map1
returns1
sample1
takes1
this3
word2

This output data will be written to partition or part file in the output directory in the format specified by OutputFormat in driver class which is text file in our example. Each reduce task writes to a single file with the filename part-r-nnnnn , where nnnnn starts at 00000 and is incremented.

Job Completion:
  • Client Node checks with Application Master for Job completion status at regular intervals of time usually every 5 seconds when job is submitted by calling waitForCompletion() method. This time interval can be configured via mapreduce.client.completion.pollinterval property.
  • Once the job is completed, Application Master and Task Container clean up their working state.
  • Job’s OutputCommitter calls the cleanup method to handle any cleanup activities
  • Job is archived by Job history server for future reference.

Thus, a Mapreduce Job flow will fall through above mentioned steps if it run successfully without any failures.


Profile photo of Siva

About Siva

Senior Hadoop developer with 4 years of experience in designing and architecture solutions for the Big Data domain and has been involved with several complex engagements. Technical strengths include Hadoop, YARN, Mapreduce, Hive, Sqoop, Flume, Pig, HBase, Phoenix, Oozie, Falcon, Kafka, Storm, Spark, MySQL and Java.


Leave a comment

Your email address will not be published. Required fields are marked *

4 thoughts on “MapReduce Job Flow

  • seema

    This is very nice article and clear lot of doubt.

    But still, I would like to know clearly that which process initiate the user code and run the map and reduce task on the cluster.

     

    Is it Node manager or Application Master.

      • Neel

        But above article says
        Task execution:
        * Node manager copies Job resources(like Job JAR file) from HDFS distributed cache and runs maps and tasks.
        *Running tasks, Keep reporting about the progress and status(including counters) of current task to Application master and Application master collects this progress information from all tasks and aggregate values are propagated to Client Node or user.??

        So the questiion:
        Which process initiate the user code and run the map and reduce task on the cluster ?
        NodeManager or Application Master ?


Review Comments
default image

I am a plsql developer. Intrested to move into bigdata.

Neetika Singh ITA Hadoop in Dec/2016 December 22, 2016

.