Mindmajix

MapReduce Tutorial

MapReduce Tutorial

Mapreduce Indepth:

We are living in the era of big data, where exponential growth of phenomena such as web, social networking, smartphones, and so on are producing petabytes of data on a daily basis. Gaining insights from analyzing these very large amounts of data has become a must-have competitive advantage for many industries. However, the size and the possibly unstructured nature of these data sources make it impossible to use traditional solutions such as relational databases to store and analyze these datasets.

Storage, processing, and analyzing petabytes of data in a meaningful and timely manner require many compute nodes with thousands of disks and thousands of processors together with the ability to efficiently communicate massive amounts of data among them. Such a scale makes failures such as disk failures, compute node failures, network failures, and so on a common occurrence making fault tolerance a very important aspect of such systems. Other common challenges that arise include the significant cost of resources, handling communication latencies, handling heterogeneous compute resources, synchronization across nodes, and load balancing. As you can infer, developing and maintaining distributed parallel applications to process massive amounts of data while handling all these issues is not an easy task. This is where Apache Hadoop comes to our rescue.

 

Google is one of the first organizations to face the problem of processing massive amounts of data. Google built a framework for large-scale data processing borrowing the map and reduce paradigms from the functional programming world and named it as MapReduce. At the foundation of Google, MapReduce was the Google File System, which is a high throughput parallel filesystem that enables the reliable storage of massive amounts of data using commodity computers. Seminal research publications that introduced Google MapReduce and Google File System concepts can be found at http://research.google.com/archive/mapreduce.html and http://research.google.com/archive/gfs.html.

Apache Hadoop MapReduce is the most widely known and widely used open source implementation of the Google MapReduce paradigm. Apache Hadoop Distributed File System (HDFS) provides an open source implementation of the Google File Systems concept.

Apache Hadoop MapReduce, HDFS, and YARN provide a scalable, fault-tolerant, distributed platform for storage and processing of very large datasets across clusters of commodity computers. Unlike in traditional High Performance Computing (HPC) clusters, Hadoop uses the same set of compute nodes for data storage as well as to perform the computations, allowing Hadoop to improve the performance of large scale computations by collocating computations with the storage. Also, the hardware cost of a Hadoop cluster is orders of magnitude cheaper than HPC clusters and database appliances due to the usage of commodity hardware and commodity interconnects. Together Hadoop-based frameworks have become the de-facto standard for storing and processing big data.

Hadoop Distributed File System – HDFS

HDFS is a block structured distributed filesystem that is designed to store petabytes of data reliably on compute clusters made out of commodity hardware. HDFS overlays on top of the existing filesystem of the compute nodes and stores files by breaking them into coarser grained blocks (for example, 128 MB). HDFS performs better with large files. HDFS distributes the data blocks of large files across to all the nodes of the cluster to facilitate the very high parallel aggregate read bandwidth when processing the data. HDFS also stores redundant copies of these data blocks in multiple nodes to ensure reliability and fault tolerance. Data processing frameworks such as MapReduce exploit these distributed sets of data blocks and the redundancy to maximize the data local processing of large datasets, where most of the data blocks would get processed locally in the same physical node as they are stored.

HDFS consists of NameNode and DataNode services providing the basis for the distributed filesystem. NameNode stores, manages, and serves the metadata of the filesystem. NameNode does not store any real data blocks. DataNode is a per node service that manages the actual data block storage in the DataNodes. When retrieving data, client applications first contact the NameNode to get the list of locations the requested data resides in and then contact the DataNodes directly to retrieve the actual data. The following diagram depicts a high-level overview of the structure of HDFS:

Hadoop v2 brings in several performance, scalability, and reliability improvements to HDFS. One of the most important among those is the High Availability (HA) support for the HDFS NameNode, which provides manual and automatic failover capabilities for the HDFS NameNode service. This solves the widely known NameNode single point of failure weakness of HDFS. Automatic NameNode high availability of Hadoop v2 uses Apache ZooKeeper for failure detection and for active NameNode election. Another important new feature is the support for HDFS federation. HDFS federation enables the usage of multiple independent HDFS namespaces in a single HDFS cluster. These namespaces would be managed by independent NameNodes, but share the DataNodes of the cluster to store the data. The HDFS federation feature improves the horizontal scalability of HDFS by allowing us to distribute the workload of NameNodes. Other important improvements of HDFS in Hadoop v2 include the support for HDFS snapshots, heterogeneous storage hierarchy support (Hadoop 2.3 or higher), in-memory data caching support (Hadoop 2.3 or higher), and many performance improvements.

Almost all the Hadoop ecosystem data processing technologies utilize HDFS as the primary data storage. HDFS can be considered as the most important component of the Hadoop ecosystem due to its central nature in the Hadoop architecture.

Hadoop YARN

YARN (Yet Another Resource Negotiator) is the major new improvement introduced in Hadoop v2. YARN is a resource management system that allows multiple distributed processing frameworks to effectively share the compute resources of a Hadoop cluster and to utilize the data stored in HDFS. YARN is a central component in the Hadoop v2 ecosystem and provides a common platform for many different types of distributed applications.

The batch processing based MapReduce framework was the only natively supported data processing framework in Hadoop v1. While MapReduce works well for analyzing large amounts of data, MapReduce by itself is not sufficient enough to support the growing number of other distributed processing use cases such as real-time data computations, graph computations, iterative computations, and real-time data queries. The goal of YARN is to allow users to utilize multiple distributed application frameworks that provide such capabilities side by side sharing a single cluster and the HDFS filesystem. Some examples of the current YARN applications include the MapReduce framework, Tez high performance processing framework, Spark processing engine, and the Storm real-time stream processing framework. The following diagram depicts the high-level architecture of the YARN ecosystem:

The YARN ResourceManager process is the central resource scheduler that manages and allocates resources to the different applications (also known as jobs) submitted to the cluster. YARN NodeManager is a per node process that manages the resources of a single compute node. Scheduler component of the ResourceManager allocates resources in response to the resource requests made by the applications, taking into consideration the cluster capacity and the other scheduling policies that can be specified through the YARN policy plugin framework.

YARN has a concept called containers, which is the unit of resource allocation. Each allocated container has the rights to a certain amount of CPU and memory in a particular compute node. Applications can request resources from YARN by specifying the required number of containers and the CPU and memory required by each container.

ApplicationMaster is a per-application process that coordinates the computations for a single application. The first step of executing a YARN application is to deploy the ApplicationMaster. After an application is submitted by a YARN client, the ResourceManager allocates a container and deploys the ApplicationMaster for that application. Once deployed, the ApplicationMaster is responsible for requesting and negotiating the necessary resource containers from the ResourceManager. Once the resources are allocated by the ResourceManager, ApplicationMaster coordinates with the NodeManagers to launch and monitor the application containers in the allocated resources. The shifting of application coordination responsibilities to the ApplicationMaster reduces the burden on the ResourceManager and allows it to focus solely on managing the cluster resources. Also having separate ApplicationMasters for each submitted application improves the scalability of the cluster as opposed to having a single process bottleneck to coordinate all the application instances. The following diagram depicts the interactions between various YARN components, when a MapReduce application is submitted to the cluster:

While YARN supports many different distributed application execution frameworks, our focus in this book is mostly on traditional MapReduce and related technologies.

Hadoop MapReduce

Hadoop MapReduce is a data processing framework that can be utilized to process massive amounts of data stored in HDFS. As we mentioned earlier, distributed processing of a massive amount of data in a reliable and efficient manner is not an easy task. Hadoop MapReduce aims to make it easy for users by providing a clean abstraction for programmers by providing automatic parallelization of the programs and by providing framework managed fault tolerance support.

MapReduce programming model consists of Map and Reduce functions. The Map function receives each record of the input data (lines of a file, rows of a database, and so on) as key-value pairs and outputs key-value pairs as the result. By design, each Map function invocation is independent of each other allowing the framework to use divide and conquer to execute the computation in parallel. This also allows duplicate executions or re-executions of the Map tasks in case of failures or load imbalances without affecting the results of the computation. Typically, Hadoop creates a single Map task instance for each HDFS data block of the input data. The number of Map function invocations inside a Map task instance is equal to the number of data records in the input data block of the particular Map task instance.

Hadoop MapReduce groups the output key-value records of all the Map tasks of a computation by the key and distributes them to the Reduce tasks. This distribution and transmission of data to the Reduce tasks is called the Shuffle phase of the MapReduce computation. Input data to each Reduce task would also be sorted and grouped by the key. The Reduce function gets invoked for each key and the group of values of that key (reduce <key, list_of_values>) in the sorted order of the keys. In a typical MapReduce program, users only have to implement the Map and Reduce functions and Hadoop takes care of scheduling and executing them in parallel. Hadoop will rerun any failed tasks and also provide measures to mitigate any unbalanced computations. Have a look at the following diagram for a better understanding of the MapReduce data and computational flows:

In Hadoop 1.x, the MapReduce (MR1) components consisted of the JobTracker process, which ran on a master node managing the cluster and coordinating the jobs, and TaskTrackers, which ran on each compute node launching and coordinating the tasks executing in that node. Neither of these processes exist in Hadoop 2.x MapReduce (MR2). In MR2, the job coordinating responsibility of JobTracker is handled by an ApplicationMaster that will get deployed on-demand through YARN. The cluster management and job scheduling responsibilities of JobTracker are handled in MR2 by the YARN ResourceManager. JobHistoryServer has taken over the responsibility of providing information about the completed MR2 jobs. YARN NodeManagers provide the functionality that is somewhat similar to MR1 TaskTrackers by managing resources and launching containers (which in the case of MapReduce 2 houses Map or Reduce tasks) in the compute nodes.

Hadoop installation modes

Hadoop v2 provides three installation choices:

  • Local mode: The local mode allows us to run MapReduce computation using just the unzipped Hadoop distribution. This nondistributed mode executes all parts of Hadoop MapReduce within a single Java process and uses the local filesystem as the storage. The local mode is very useful for testing/debugging the MapReduce applications locally.
  • Pseudo distributed mode: Using this mode, we can run Hadoop on a single machine emulating a distributed cluster. This mode runs the different services of Hadoop as different Java processes, but within a single machine. This mode is good to let you play and experiment with Hadoop.
  • Distributed mode: This is the real distributed mode that supports clusters that span from a few nodes to thousands of nodes. For production clusters, we recommend using one of the many packaged Hadoop distributions as opposed to installing Hadoop from scratch using the Hadoop release binaries, unless you have a specific use case that requires a vanilla Hadoop installation. Refer to the Setting up Hadoop ecosystem in a distributed cluster environment using a Hadoop distribution recipe for more information on Hadoop distributions.

Project files for the IntelliJ IDEA IDE can be generated by running the gradle idea command in the main folder of the code repository.

Setting up Hadoop v2 on your local machine

This recipe describes how to set up Hadoop v2 on your local machine using the local mode. Local mode is a non-distributed mode that can be used for testing and debugging your Hadoop applications. When running a Hadoop application in local mode, all the required Hadoop components and your applications execute inside a single Java Virtual Machine (JVM) process.

Getting ready

Download and install JDK 1.6 or a higher version, preferably the Oracle JDK 1.7. Oracle JDK can be downloaded from http://www.oracle.com/technetwork/java/javase/downloads/index.html.

How to do it…

Now let’s start the Hadoop v2 installation:

  1. Download the most recent Hadoop v2 branch distribution (Hadoop 2.2.0 or later) from http://hadoop.apache.org/releases.html.
  2. Unzip the Hadoop distribution using the following command. You will have to change the x. in the filename to the actual release you have downloaded. From this point onward, we will call the unpacked Hadoop directory {HADOOP_HOME}:

$ tar -zxvf hadoop-2.x.x.tar.gz

  1. Now, you can run Hadoop jobs through the {HADOOP_HOME}/bin/hadoop command, and we will elaborate on that further in the next recipe.

How it works…

Hadoop local mode does not start any servers but does all the work within a single JVM. When you submit a job to Hadoop in local mode, Hadoop starts a JVM to execute the job. The output and the behavior of the job is the same as a distributed Hadoop job, except for the fact that the job only uses the current node to run the tasks and the local filesystem is used for the data storage. In the next recipe, we will discover how to run a MapReduce program using the Hadoop local mode.

Writing a WordCount MapReduce application, bundling it, and running it using the Hadoop local mode

This recipe explains how to implement a simple MapReduce program to count the number of occurrences of words in a dataset. WordCount is famous as the HelloWorld equivalent for Hadoop MapReduce.

To run a MapReduce job, users should supply a map function, a reduce function, input data, and a location to store the output data. When executed, Hadoop carries out the following steps:

  1. Hadoop uses the supplied InputFormat to break the input data into key-value pairs and invokes the map function for each key-value pair, providing the key-value pair as the input. When executed, the map function can output zero or more key-value pairs.
  2. Hadoop transmits the key-value pairs emitted from the Mappers to the Reducers (this step is called Shuffle). Hadoop then sorts these key-value pairs by the key and groups together the values belonging to the same key.
  3. For each distinct key, Hadoop invokes the reduce function once while passing that particular key and list of values for that key as the input.
  4. The reduce function may output zero or more key-value pairs, and Hadoop writes them to the output data location as the final result.

Getting ready

Select the source code for the first chapter from the source code repository for this book. Export the $HADOOP_HOME environment variable pointing to the root of the extracted Hadoop distribution.

How to do it…

Now let’s write our first Hadoop MapReduce program:

  1. The WordCount sample uses MapReduce to count the number of word occurrences within a set of input documents. The sample code is available in the chapter1/Wordcount.java file of the source folder of this chapter. The code has three parts—Mapper, Reducer, and the main program.
  2. The Mapper extends from the apache.hadoop.mapreduce.Mapper interface. Hadoop InputFormat provides each line in the input files as an input key-value pair to the map function. The map function breaks each line into substrings using whitespace characters such as the separator, and for each token (word) emits (word,1) as the output.

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split the input text value to words
StringTokenizer itr = new StringTokenizer(value.toString());

// Iterate all the words in the input text value
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, new IntWritable(1));
}
}

  1. Each reduce function invocation receives a key and all the values of that key as the input. The reduce function outputs the key and the number of occurrences of the key as the output.

public void reduce(Text key, Iterable<IntWritable>values, Context context) throws IOException, InterruptedException
{
int sum = 0;
// Sum all the occurrences of the word (key)
for (IntWritableval : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}

  1. The main driver program configures the MapReduce job and submits it to the Hadoop YARN cluster:

Configuration conf = new Configuration();
……
// Create a new job
Job job = Job.getInstance(conf, “word count”);
// Use the WordCount.class file to point to the job jar
job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// Setting the input and output locations
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, newPath(otherArgs[1]));
// Submit the job and wait for it’s completion
System.exit(job.waitForCompletion(true) ? 0 : 1);

  1. Compile the sample using the Gradle build as mentioned in the introduction of this chapter by issuing the gradle build command from the chapter1 folder of the sample code repository. Alternatively, you can also use the provided Apache Ant build file by issuing the ant compile
  2. Run the WordCount sample using the following command. In this command, WordCount is the name of the main class. wc-input is the input data directory and wc-output is the output path. The wc-input directory of the source repository contains a sample text file. Alternatively, you can copy any text file to the wc-input directory.

$ $HADOOP_HOME/bin/hadoop jar \
hcb-c1-samples.jar \
chapter1.WordCount wc-input wc-output

  1. The output directory (wc-output) will have a file named part-r-XXXXX, which will have the count of each word in the document. Congratulations! You have successfully run your first MapReduce program.

$ cat wc-output/part*


0 Responses on MapReduce Tutorial"

Leave a Message

Your email address will not be published. Required fields are marked *

Copy Rights Reserved © Mindmajix.com All rights reserved. Disclaimer.
Course Adviser

Fill your details, course adviser will reach you.