When it comes to dealing with a massive amount of data from social media, businesses, sports, research, healthcare, or any other relevant source, big data analysis is the most favourable option. Technologies like Hadoop, Yarn, NoSQL, Hive, Spark, etc., are soaring across the digital lake for fetching useful insights hidden inside the data. In this tutorial, we are going to uncover the working of Hadoop’s core heart i.e., MapReduce. Let’s get it done.
Mapreduce in Bigdata - Table of Contents
MapReduce is one of the most famous programming models used for processing large amounts of data. It is the most critical part of Apache Hadoop. Hadoop has potential to execute MapReduce scripts which can be written in various programming languages like Java, C++, Python, etc. Since MapReduce scripts execute in parallel, they are very helpful in analysing data with the help of machine clusters at a very large scale. And with MapReduce paradigm, you can scale up to thousands of servers inside a Hadoop cluster.
The word MapReduce points towards the two different tasks performed by Hadoop programs. Firstly, a Map task takes the data set converting them into a broken key-value pairs placed in tuples. Then, the reduce task takes output data set by combining tuples into small data tuples.
MapReduce allows us to scale data processing over various computing nodes. Mappers and Reducers primitives are used in this model. The main popularity of MapReduce model among developers is due to its easy to write Map and Reduce programs which can be easily executed over the required thousands of machine clusters with just some simple configuration changes.
The concept of mapreduce is rapidly adopted by organizations due to its wide range of application among businesses. The softwares running mapreduce are capable of serving complex business problems by analyzing massive data sets serving organizational growth.
The main reason behind the popularity of MapReduce is that developers can use it in running massive distributed data sets over Spark or Hadoop big data parallel computing models. Thus, they can perform various advanced statistical and scientific computations over this data to fetch useful insights from it. These organized data models are very helpful in prediction modeling, pattern analysis, recommendations, classifications, regressions and several more tasks.
The MapReduce scripts are executed into two phases called Map phase and Reduce phase. Both of them are provided with key-value pairs. For performing operations, you need to define two functions as map function and reduce function.
Let’s explore the MapReduce working with the example of a given data set. Suppose, you have provided the following data set as an input to your MapReduce program:
This model will provide the following output:
MapReduce | 2 |
Here | 1 |
This | 1 |
Is | 3 |
The | 1 |
Hadoop | 1 |
Analysis | 1 |
For | 1 |
The above provided input data set goes through the following phases:
Split: The input provided to the MapReduce program is crunched into fixed and small size chunks known as the input splits. These small input data sets are individually used by a single map.
Map: Now, since the data is divided into chunks, it’s time to make your MapReduce program to work. The mapping phase avails the output values set by processing each chunk through a mapping function. In the above discussed scenario, the mapping process is responsible for counting the total number of word occurrences from the split data. It creates a list containing the word and its occurrence frequency as (word, total occurrences).
Shuffle: In shuffling step, the accurate record from the mapping operation is stored in a shuffled manner. It maintains the record of output obtained from the last step. Here, it provides the exact frequency of the word provided by input, such as in above ‘is’ have frequency of 3 since it comes 3 times in the data set.
Reduce: The last phase of MapReduce aggregates the output collected by shuffling the entire data. It combines the data from last step and offers the single output. You can say, it summarizes the whole information.
Let’s understand the complete MapReduce journey in a step by step manner:
One map task is assigned to each split. The data in every split is run through the map functions.
Note that the much smaller splits are not suitable for the execution as they require more efforts in managing splitting loads and task creation increasing program running time. For an efficient execution, a few programmers prefer to create the split size equal to the HDFS block size that is 64 MB.
The cases like a node failure before the consumption of output by reduce task are managed by Hadoop. It executes the map task again over a new node and creates a map output for further execution.
During a MapReduce job execution, Hadoop assigns the map and reduce tasks individually to the servers inside the cluster. It maintains all the relevant details such as job issuing, verification of a job completion, or data cloning across the nodes of clusters.
Most of the computing operations are performed over the network node, leveraging data from the local disks, reducing the overall network traffic. Once a job is completed, the cluster is responsible for collecting and reducing the information into the relevant outcome which is returned back to the Hadoop server. Now we have all we need. We got data in key-pair format which is easy to recognize.
How Hadoop organizes its work?
The Hadoop jobs are categorized into two tasks:
Their running operations are handled by two entities:
The Jobtracker entity handles the entire execution of the submitted jobs and Multiple Task Tracker, as the name suggests, is responsible for performing each job. Behind each submitted job, there is a Jobtracker held on the Namenode and there can be multiple tasktrackers available over the Datanode.
Here, a job is defined into the multiple tasks so that we can run them over various data nodes of the cluster. Now, it’s a jobtracker’s responsibility to manage activities by scheduling tasks for executing them over various data nodes. You can see it as a master.
Each task is individually monitored by tasktracker available over each data node, performing the job oriented executions. This is considered as a slave. The tasktracker provides the progress report to a Jobtracker. Also, it sends the heartbeat signal in a periodic manner to acknowledge the system state.
This way, the jobtracker holds the entire progress details for each job. Suppose if a task fails, the Jobtracker will reschedule this task to another tasktracker.
The MapReduce model resides on the (key, value) pairs. It takes an input in the form of a (key, value) and produces the output again in a (key, value) pair data sets. These key and value classes require the Writable interface, since the value of them needs to be in a serialized way. The Writable-Comparable interface is used for sorting these data sets.
You can see it as: Input (a,x) -> (b, y) map -> (c,z) output. Here a, b, c represents key and x, y, z represents value set.
In Hadoop, a job is defined as any task which is assigned for its execution so that we can achieve desired results. The JobConf interface describes a job to the Hadoop framework for its execution. A job points towards the implementation of the Mapper, Reducer, InputForma, OutputFormat, Reducer, and Partitioner. It specifies the jobs-oriented parameters. A few parameters controlled by administrator may not be altered in Hadoop. We use Job Configuration for assigning the required tasks like mapping, reducing, partitioning, etc.
Job gives us a brief about each of the job facets like:
On the requirement of your application, you can select the Configuration methods like set(String String) and get(StringString). Thus, you can set and get arbitrary parameters. Always remember to use DistributedCache for dealing with massive amounts of data.
The FileInputFormat method refers to the input documents containing FileInputFormat.setInputPaths (Job, Path), FileInputFormat.setInputPaths (Job, String), and FileInputFormat.addInputPath (Job, Path). The output of the file is stored in FileOutputFormat.setOutputPath (Path).
Additionally, the Job interface manages the advanced job parameters such as Comparator, DistributedCache for putting files,. Compression, speculations through setMapSpeculativeExecution (Boolean), maximum attempts for a task through setMaxMapAttempts (int).
A user can leverage the Configuration.set (String, String) to perform set/get operations over the parameters required by the program.
Memory Management during Configuration: Through Hadoop main user (admin), you can specify the maximum virtual memory for the executing child process and its sub-processes. You can use MapReduce.{map|reduce}.memory.mb for this task. Its value is specified in the megabytes (MB).
Note that the available storage for some part of the framework is configurable. The performance of the map and reduce jobs can be improved with the parameter such as task concurrency or the disk data accessing speed. While monitoring the file system of a task through counters, it requires the byte counts of the map and reduce operational parameters.
Task Execution and Environment:
With MRAppMaster class, you can run Mapper or Reducer jobs as a child process over the separated java virtual machine (JVM). The child process obeys the environment of its parent task from MRAppMaster.
You can also use other options for child-JVM through the MapReduce. {map|reduce}. Java.opts
If your MapReduce. {map|reduce}. Java.opts parameter holds the @taskid@ it is altered with the task value of the MapReduce job.
Generally, Job is used by a user for developing their application, defining various aspects of a job, its submission, and monitoring.
In MapReduce, a Job is the main interface for interacting with the ResourceManager. A Job avails the environment for submitting, tracking, accessing individual reports, logging, and fetching other sensitive details from the cluster
The job submission process includes the following phases:
The history of the whole job is stored in the form of a log inside the user’s defined directory, which is MapReduce.jobhistory.intermediate-done-dir and MapReduce.jobhistory.done-dir. These are the default output directories for a job.
If you want to go through the history logs, you can write the command: $ mapred job -history output.jhist. It will display the job details, failed jobs or killed jobs details.
For further information such as attempts or successful jobs, you can use the following command: $ mapred job -history all output.jhist
A user can require the various MapReduce jobs to be combined for completing the challenging tasks which are not possible with a single MapReduce job. It is not a tough task because the outcome of a job went through the distributed file system and this output can be again used for another next job as an input.
It means, you got complete control over the job completion or failure. You can use the options like:
Job.submit(): for submitting a job into the cluster and return
Job.waitForCompletion(Boolean): For submitting a job to the cluster and waiting for it to get finished.
The InputFormat script is responsible for all the input-related information in a MapReduce job. It validates the input and split it into logical instances for each mapper. Based on the total size of input files (in bytes,) the FileInputFormat subclass divides logical instances. You can set the lower bound limit to the split size through mapreduce.input.fileinputformat.split.minsize.
TextInputFormat is the default format. It detects the Input files through .gz extension. These files are automatically de-compressed in an individual mapper.
Note: Including RecordReader in your program will provide you the complete details of the record-boundaries for logical InputSplit of a task.
InputSplit presents input data in the form of bytes views which get processed by a mapper. This data is processed by RecordReader. The input file path of the logical split is set to MapReduce.map.input.file by FileSplit.
The InputSplit is read by RecordReader through a key-value pair. The main goal of a RecordReader is to convert the byte-oriented input into a record-oriented view so that a Mapper can easily process it.
MapReduce framework uses the same process as input for providing output. The output details for jobs are specified through OutputFormat. It validates output specifications, and provides a Now, the RecordWriter will be responsible for writing this final output (key, value) pair into file.
Now, the RecordWriter will be responsible for writing this final output (key, value) pair into file.
The Hadoop streaming utility is availed by Hadoop distribution. This module helps in creating and executing MapReduce jobs through mapper and reducer for provided script. Let’s have a look:
How Hadoop Streaming works?
It is the standard communication protocol followed by the MapReduce framework and also streaming mapper or reducer.
You can provide a Java class as mapper or reducer. It will be shown as:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper org.apache.hadoop.mapred.lib.IdentityMapper
-reducer /bin/wc
You can also set stream.non.zero.exit.is.failure to boolean true and false which is defined as a boolean variable inside main mapreduce script. Thus, you can create a streaming task which exits at the non-zero status for failure or success accordingly. In MapReduce, the streaming task which exits with the non-zero status falls under the failed tasks category by default.
Let’s explore Streaming with a word-count problem. Below is the Python program for detailed explanation of mapper and reducer tasks. You can also write it in the language of your choice such as Java, Perl or Ruby.
Mapper program:
Here is the code for the mapper. You must provide the permission for file execution through chmod +x /home/expert/hadoop-1.2.0/mapper.py.
!/usr/bin/python import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for output in words:
# Write the results to standard output
print '%st%s' % (output, 1)
Reducer Program:
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%st%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%st%s' % (current_word, current_count)
Save these files as mapper.py and reducer.py and place them in the home directory of Hadoop.
Now, we will execute our wordcount program.
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar
-input input_dirs
-output output_dir
-mapper
Files Packaging with Job Submissions:
You can provide any executable to a mapper or reducer. It is not compulsory that these executables already exist over the machine cluster. If they are not present, you can use –file option to let framework pack the executable files as a job submission.$HADOOP_HOME/bin/hadoop
jar
$HADOOP_HOME/hadoop-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper myProgram.py
-reducer /bin/wc
-file myProgram.py
In this program, we are using a user defined executable as the mapper. Through –file myProgram.py command, we are passing executable to the machine cluster as a job submission task.
You can also use other packages like configuration file, dictionaries etc., as an executable which can be leveraged by mapper or reducer.
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper myPythonScript.py
-reducer /bin/wc
-file myProgram.py
-file myDictionary.txt
In Hadoop, slave node represents the node where data is stored and processed. Now, you must be wondering why slave servers? Well, mapreduce is widely used among organizations for processing large amount of data. So, it is compulsory to have a detailed understanding of how data is placed over the distributed node instead of a file in the Hadoop environment. Here are the services for enabling slave node processing:
Note: This service is replaced by Yarn in Hadoop 2.
Above here, the slave node is running the DataNode and NodeManager instances. Container processes are running individually over the cluster. Each container has individual tasks from the application. A dedicated ApplicationMaster task is provided to the each executing application. It keeps the record for each individual task running over cluster till the execution terminates.
Frequently Asked MapReduce Interview Questions & Answers
To understand how data is flowing across multiple nodes of our huge distributed environment, we need to figure out installation of Hadoop. Here, we are demonstrating Hadoop cluster environment with the help of 3 machines which are referred as, one as master and other two as slaves.
Hadoop Master (hadoop-master)
Hadoop slave 1(hadoop-slv-1)
Hadoop slave 2(hadoop-slv-2)
Now, for setting up the multi-node clustering architecture, follow the below described steps:
Pre-requisites: Install Java (In this case, we are assuming it is already installed over the machine. If not, you can download and install it from www.oracle.com )
You can check the installed version by:
$ java –version
Now, for Hadoop installation, you need to create the user account on machine for master as well as slave.
# useradd hadoop
# passwd hadoop
Change /etc/ directory from host file of all nodes. You need to provide the IP address of each machine with their host names as:
Write command vi for editing hostfile: # vi /etc/hosts to open this directory.
enter the following lines in the /etc/hosts file. Change IP with yours.
123.123.123.123 hadoop-master
123.123.123.123 hadoop-slv-1
123.123.123.123 hadoop-slv-2
You need to set SSH for each node so that they can communicate without any hurdle.
# su hadoop
$ ssh-keygen -t rsa
$ ssh-copy-id -i ~/.ssh/id_rsa.pub mycomputer@hadoop-master
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_mcom1@hadoop-slv-1
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_mycom2@hadoop-slv-2
$ chmod 0600 ~/.ssh/authorized_keys
$ exit
Now we need to download the Hadoop from its source to our machine. You can download and install
Hadoop with the commands:
# mkdir /opt/hadoop //creating directory in given path
# cd /opt/hadoop/
# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.0/hadoop-1.2.0.tar.gz //downloading hadoop file from source
# tar -xzf hadoop-1.2.0.tar.gz //decompressing downloaded file
# mv hadoop-1.2.0 hadoop //moving file inside folder
# chown -R hadoop /opt/hadoop //changing ownership
# cd /opt/hadoop/hadoop/
Before going further for execution, we need to alter a few parameters in the downloaded Hadoop files. We are configuring a few required parameters such as port number. Now, make the following changes:
Open core-site.xml file and make edits:
Here, we are changing port number as per requirement.
fs.default.namehdfs://hadoop-master:9000/dfs.permissionsfalse
Open hdfs-site.xml file and make following edits:
< configuration >
< property >
< name >dfs.data.dir< /name >
< value >/opt/hadoop/hadoop/dfs/name/data< /value >
< final >true< /final >
< /property >
< property >
< name >dfs.name.dir< /name >
< value >/opt/hadoop/hadoop/dfs/name< /value >
< final >true< /final >
< /property >
< property >
< name >dfs.replication< /name >
< value >1< /value >
< /property >
< /configuration >
Open a mapred-site.xml file and make edits as:
< configuration >
< property >
< name >mapred.job.tracker< /name >
< value >hadoop-master:9001< /value >
< /property >
< /configuration >
Open hadoop-env.sh file. Make changes in JAVA_HOME, HADOOP_CONF_DIR
, and HADOOP_OPTS
export JAVA_HOME=/opt/jdk1.6.0_17
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true
export HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf
Installing hadoop over Slave Servers:
Type command:
# su hadoop
$ cd /opt/hadoop
$ scp -r hadoop hadoop-slv-1:/opt/hadoop
$ scp -r hadoop hadoop-slv-2:/opt/hadoop
We need to create our master machine. Write commands:
# su hadoop $ cd /opt/hadoop/hadoop Configure Master Node: $ vi etc/hadoop/masters hadoop-master
Now we will configure both the slave nodes.
$ vi etc/hadoop/slaves hadoop-slv-1 hadoop-slv-2
We are formatting the NameNode which keeps the directory tree for all the files in file system.
# su hadoop
$ cd /opt/hadoop/hadoop
$ bin/hadoop namenode –format
16/01/19 11:37:28 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = hadoop-master/123.123.123.123
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.1.0
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1479473;
************************************************************/
16/01/19 11:37:30 INFO util.GSet: Computing capacity for map BlocksMap
editlog=/opt/hadoop/hadoop/dfs/name/current/edits
………………………………………………….
………………………………………………….
………………………………………………….
16/01/19 11:38:03 INFO common.Storage: Storage directory
/opt/hadoop/hadoop/dfs/name has been successfully formatted.
16/01/19 11:38:35 INFO namenode.NameNode:
SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at hadoop-master/123.123.123.123
************************************************************/
Start Hadoop services by the command:
$ cd $HADOOP_HOME/sbin
$ start-all.sh
Master Execution
mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh
ssh-keygen -t rsa -P '' -f $HOME/.ssh/id_rsa
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 644 $HOME/.ssh/authorized_keys
Clone the public key to new slave node in hadoop user $HOME directory
scp $HOME/.ssh/id_rsa.pub hadoop@123.123.123.123:/home/hadoop/
Login into Hadoop user
su hadoop ssh -X hadoop@123.123.123.123
Copy public key to the directory $HOME/.ssh/authorized_keys. Now, change the permissions:
cd $HOME
mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh
cat id_rsa.pub >>$HOME/.ssh/authorization_keys
chmod 644 $HOME/.ssh/authorization_keys
Ensure SSH authentication by logging through the master machine. Check it for the new slave (If any added)
ssh hadoop@123.123.123.123 or hadoop@slave3
Setting up Hostname for New Node
Set it in the directory /etc/sysconfig/network
You can set the hostname in network file on new slave machine
NETWORKING = yes HOSTNAME = slave3.in
Now, you must restart the machine to make these changes visible.
Over slave3 node machine −
hostname slave3.in
Change /etc/hosts for all machines of the cluster by:
123.123.123.123 slave3.in slave3
Ping on new node machine:
Ping master.in
The data node daemon can be manually started with the $HADOOP_HOME/bin/hadoop-daemon.sh script. It automatically communicates with the NameNode which is master and marks its presence in the cluster.
If you need to add a new node, open the master server file conf/slaves.
Login into the new node:
zu hadoop or ssh -X hadoop@123.123.123.123
Initiate HDFS over new slave node
./bin/hadoop-daemon.sh start datanode
Check output as:
$ jps
6728 DataNode
12673 Jps
The decommissioning feature of the HDFS ensures the safe removal of a node. It is compulsory to prevent any data loss while removing a node from the cluster. Follow the given steps:
Login in master machine using:
$ su hadoop
Change configuration of the cluster:
Add key dfs.hosts.exclude in $HADOOP_HOME/etc/hadoop/hdfs-site.xml file.
< property >
< name >dfs.hosts.exclude< /name >
< value >/home/hadoop/hadoop-1.2.0/hdfs_exclude.txt< /value >
< description >DFS exclude< /description >
< /property >
Decommissioning host selection:
The machine which requires to be decommissioned need be added in hdfs_exclude.txt file. It follows adding one domain name per line. It halts the connection with the name node. Suppose, you are planning to remove DataNode2, write:
slave2.in
Execute the command:
$ $HADOOP_HOME/bin/hadoop dfsadmin –refreshNodes
With this command, the NameNode will re-read its configuration. It will also read newly altered exclude file. After a few times the node will automatically stops.
Once you are done with the decommissioning, now it’s time to let your hardware halt its operations so that it can undergo maintenance. You can check its status through dfsadmin report command.$ $HADOOP_HOME/bin/hadoop dfsadmin –report
After decommissioning of the machine, you can eliminate it with exclude file. Run $HADOOP_HOME/bin/hadoop dfsadmin –refreshNodes command again so that it can read exclude file into NameNodes. It again attaches cluster to the DataNodes after the completion of maintenance.
Now, we have successfully installed Hadoop over a slave node of distributed environment. We learnt how data actually flows across the multiple nodes of Hadoop cluster. Here we created a master server and two slave node for running Hadoop Data File System (HDFS). This way we can shuffle our data and perform mapping and reducing task over it. HDFS is capable of processing thousands of data files serving us the valuable results through it and the best part is you can use it on your local machine.
In this article, we have covered the map and reduce tasks working along with the slave machines. We also learnt how to install Hadoop framework for the distributed application along with a network node. So, it’s time to become part of this innovative digital growth where you will get millions of real-world data sources to deal with. Once you understand how to play with this data, you will find yourself at the top.
Hadoop Administration | MapReduce |
Big Data On AWS | Informatica Big Data Integration |
Bigdata Greenplum DBA | Informatica Big Data Edition |
Hadoop Hive | Impala |
Hadoop Testing | Apache Mahout |
Name | Dates | |
---|---|---|
MapReduce Training | Nov 05 to Nov 20 | View Details |
MapReduce Training | Nov 09 to Nov 24 | View Details |
MapReduce Training | Nov 12 to Nov 27 | View Details |
MapReduce Training | Nov 16 to Dec 01 | View Details |
Ravindra Savaram is a Technical Lead at Mindmajix.com. His passion lies in writing articles on the most popular IT platforms including Machine learning, DevOps, Data Science, Artificial Intelligence, RPA, Deep Learning, and so on. You can stay up to date on all these technologies by following him on LinkedIn and Twitter.