MapReduce Implementation in Hadoop
Google and its MapReduce framework may rule the roost when it comes to massive-scale data processing, but there’s still plenty of that goodness to go around. This article gets you started with the MapReduce implementation for processing large data sets.
The MapReduce programming model requires a software platform to run on. Popular platforms that support MapReduce, such as Apache Hadoop, Google File System, or Amazon EC2 follow an implementation pattern similar to the figure below. Data and processors are distributed across a utility compute environment. When a user submits a MapReduce program, a “master” process divvies up the tasks based on the location of the data. Parallel processes, or workers, are sent the map() instruction by the master program, based on the proximity of the processor to the data. The master favors workers that are on the same machine or at least the same rack as the data they are processing (Dean & Ghemawat, 2004). The master also attempts to divvy up the task to data ratio, so that individual worker process works in units of blocks of the underlying file system.
As the figure below illustrates, once the “master” program has divvied up the blocks of data to various workers, each worker processes the underlying list of data through the user implementation of the map() procedure. The output of each map() process, is a list of key*value pairs written to the local file system of the work that processed them. The master also coordinates the reduce() processing of the intermediate key*value pairs. The reduce() process will often have to perform remote reads of the intermediate files in order to gather all of the common intermediate key values.
Figure 3 – Execution Overview (Dean & Ghemawat, 2004)
In MapReduce architectures, the “master” program has other critical tasks, including failure detection. The master process polls the status of worker processes and ensures that they are actively processing their blocks. If the master process detects that a worker has failed, it will coordinate the execution of the map() or reduce() tasks on other nodes. The master process can also recognize when particular key/value pairs cause crashes in the map() process, and it will skip those values on re-execution. In other words, the map process can work around bugs in third-party libraries. MapReduce combined with a distributed file system such as Hadoop Distributed File System (HDFS) provides the high availability and fault tolerance (Borthakur, 2009). Hadoop is an open source Java implementation of MapReduce (White, 2010). Because a network bandwidth is scarce, the Map Reduce paradigm, and in particular the Map Reduce Master, attempts to schedule workers on or near the same machines where the distributed shards exist.
MapReduce implementations are optimized for multiple node commodity hardware. The figure below shows a typical MapReduce implementation. “Nodes” are the physical servers which support both the MapReduce processes and the data. These servers are built on inexpensive X486 architectures. Node failures are common and systems that support MapReduce, such as Hadoop Distributed File System (HDFS) have built in redundancy to account for these failures. Commodity PC’s like the ones below, typically run in large data centers with 30-40 nodes per rack.
In an optimal MapReduce commodity hardware farm, large network bandwidth is required to enable efficient passing of data between nodes and routers
Figure 4 – Commodity Hardware (Borthakur, 2009)
Let us have a deep discussion about various implementations in the further coming section.
Google File System
One of the first published MapReduce environments is the Google File System (GFS) (Ghemawat, Gobioff, & Leung, 2003). GFS is a propriety system used by Google to support its large data processing and analytical needs, in particular the crawled web content to be processed by the indexing system. GFS is built to run on many inexpensive commodity servers with compute, memory and storage components that often fail. It constantly monitors itself and recovers from failures promptly, by using replicas of data running on other nodes.
Figure 4 – The Google File System (Ghemawat, Gobioff, and Leung, 2003)
GFS is often defined as a “relaxed consistency model”, in that it recognizes and concedes all replicas will not be consistent at all times (Passing). GFS classifies a file region or chunk as either being consistent, defined or inconsistent. Consistent chunks occur when all replicas are the same and all clients of the file system will see the same data. Defined chunks occur when a change occurs, but even if the replicas are not currently consistent; all clients are directed to the updated chunks and as such are able to see the change. Inconsistent chunks occur when a change fails to be replicated in all copies, causing some of the clients to see different data than others. In GFS, as in many of the distributed file systems that MapReduce runs on, clients must be aware that inconsistencies are possible and handle them accordingly, either through unique identifiers, or other validation techniques.
There are several points of seamless integration with MapReduce and GFS. Because GFS splits files into chunks of fixed size and replicates chunks across machines, this simplifies data access and scheduling of the map() functions. MapReduce also benefits from the fault tolerance of GFS systems and scheduling of computation to be closely located to the data chunks. The GFS system and Google’s approach to leveraging distributed file systems and MapReduce inspired Doug Cutting to create the software that we will review next, the open source Apache Hadoop project.
Hadoop is an open source project for processing large data sets in parallel with the use of low level commodity machines.
Hadoop is built on two main parts: A special file system called Hadoop Distributed File System (HDFS) and the Map Reduce Framework.
Apache Hadoop is an implementation of the MapReduce programming model. Inspired by Google File System, the Apache Hadoop project provides a Java based open source alternative. The Apache Hadoop framework, in addition to implementing the MapReduce programming model, provides additional components and a wide variety for processing needs (Hadoop, 2011). The components of the Apache Hadoop Common project include
- Hadoop Distributed File System
Hadoop common provides components for both data analysis using Online Analytic Processing (OLAP) and fast data retrieval through Online Transaction Processing (OLTP). OLAP programs are used to perform long running queries that provide insights into the underlying data. In contrast, OLTP processing is typically required for fast real time (< 1 second) data retrieval, for applications such as eCommerce shopping experiences. A typical pattern for MapReduce is for analysts to schedule longer running jobs to process the data in the HDFS system, and then organize it in a manner where it can be accessed quickly for front end application requests using HBase. The following sections describe each of the Hadoop projects.
While the Hadoop project includes a distributed file system, Hadoop supports other file systems as well. In fact, Hadoop can work directly with any distributed file system that can be mounted by the operating system through the file:// URL. However, the benefit one gets from using a “bridged” file system rather than a supported file system is locality. When Hadoop knows the “location” of the data, it can and does assign relatively local workers to process it. Hadoop supports Amazon S3 and Kosmos Distributed File System, in addition to HDFS. The section below will focus on HDFS and the other components of Hadoop. The figure below illustrates the different components of Hadoop.
Figure 5 – Hadoop Architecture
Hadoop Distributed File System (HDFS)
The Hadoop Distributed File System (HDFS) is a sub-project of the Apache Hadoop project. The Hadoop Distributed File System (HDFS) is designed to store very large datasets reliably, and to stream those data sets at high bandwidth to user applications. In a large cluster, thousands of servers both hosts directly attached storage and execute user application tasks. By distributing storage and computation across many servers, the resource can grow with demand while remaining economical at every size. We describe the architecture of HDFS and report on experience using HDFS to manage 40 petabytes of enterprise data at Yahoo!
The Hadoop Distributed File System (HDFS) provides a highly fault tolerant storage platform that can run on thousands of nodes in a utility compute environment. HDFS is not dependent upon the underlying physical file system, and can run on NFS, SDD, DAS, and a variety of other storage types. Because the HDFS system exposes a standard API for managing block file data, the developer is not required to write specific code for the different storage technologies listed above. HDSF supports the following functionality:
Asynchronous Replication: Apache HBase Replication is a way of copying data from one HBase cluster to a different and possibly distant HBase cluster. It works on the principle that the transactions from the originating cluster are pushed to another cluster. HDFS has built in redundancy, and shards of data are replicated asynchronously across the system. Pieces of data are located in “shards”, or storage chunks. Shards are replicated to other servers, so that at any time if a server fails, another copy of the data located on it is available in the HDFS system. The introduction of a distributed, redundant file system means that updates to any given shard requires the updates to be applied to replicas where ever they exist throughout the system. With asynchronous replication, these updates are applied to replicas when possible. However, if a situation occurs where the replica cannot be applied, it is discarded. An example of this could occur when two persons simultaneously update different physical shards containing the same logical data. Systems that leverage asynchronous replication are considered to be “eventually consistent”, i.e. not all replicas will be guaranteed to have the same data. Contrast asynchronous replication with synchronous replication, where, when an update is made to a shard must be successfully applied to all replicas before it can be accepted. Systems that use synchronous replication typically require consistent data over availability.
Write Once, Read Many (WORM): Because HDFS uses asynchronous replicate; it is tuned and optimized to provide very fast read access. Hadoop architectures perform most effectively when the applications they support do an initial data load, with subsequent accesses being primarily queries.
Large (64MB) Blocks: HDFS supports very large data blocks. HDFS defaults to 64MB data blocks, which enable it to support a variety of unstructured data in a single block, including blobs, data files, images, etc.
Replication: By default, each block of data is replicated 3 times in an HDFS cluster. The replication is located on a different node to ensure highly fault tolerant architecture. Each update to the HDFS system requires two additional replications to the replicas of that block. That is why Hadoop is tuned to write once, read many times.
The HDFS system is controlled by a master node. The master node contains the primary index of the HDFS system, which includes what data is located in each of the blocks that make up the file system. Within the master node, one or more name nodes contain the metadata containing the block and key mapping required by the MapReduce pattern. The figure below shows an HDFS structure with name nodes and data nodes.
Clients who access HDFS systems have their requests for a specific file relayed to a name node, which returns the addresses of the file and its Data Nodes. The Data nodes provide a block-id to the physical node location on disk. The secondary name node provides a periodic merge of the transaction log. In HDFS, clients can only read or append existing files.
Figure 6 – Hadoop HDFS Architecture (Borthakur, 2009)
The HDFS architecture in the figure above is similar to the GFS. The Name Node stores in memory the list of files in that HDFS cluster, the list of blocks for each file, the list of data nodes for each block, and other file attributes such as creation time, replication factors, etc. The Name Node also stores a transaction log, which is a list of all the activity which has occurred against that HDFS cluster (White, 2010).
The Data Node can be thought of as a block server, or a process within each commodity server that stores data. The Data Node stores data in the local file system, stores the meta-data of that block, serves up data and meta-data to clients, and performs other file system functions such as check-sums. The Data Node communicates with the Name Node, by sending a report of all existing blocks within its realm to the NameNode. The Data Node also facilitates the pipelining of data to other Data Nodes for replication.
HDFS follows a block placement strategy, that for fault tolerance of the commodity hardware it runs on. At most, one unit or block of data will be stored on a single node. Replicas will be stored on remote racks throughout the cluster, to ensure high availability. Clients who access HDFS, such as MapReduce, will always be directed towards the nearest replica of data by the Name Node. This facilitates high performance by reducing the amount of network latency required to facilitate requests.
HBase is a SQL-Like data structure layered over HDFS to provide clients who need high performance, low latency access of data. HBase does not use the MapReduce programming model, but rather data is column oriented, using single keys, and columns. The goal of HBase is to provide fast access to de-normalize data over billions of rows and millions of columns.
Hadoop Map Reduce
The Hadoop MapReduce implementation supports the map() and reduce() functions described previously. As a MapReduce framework, Hadoop is responsible for allocating workers within the infrastructure to process these functions. A special component in Hadoop called “JobTracker” schedules and manages jobs on the NameNode for performing the Map and Reduce functions. In order to keep the processing as close as possible to the data, the NameNode coordinates a variety of “TaskManager” processes which run on each of the Data Nodes.
Hive is an open source-software component of Hadoop common that lets data scientists analyze large data sets on Hadoop, without being familiar with the MapReduce programming model (Overview of Hadoop and Hive). It is a data warehouse infrastructure tool to process structured data in Hadoop. It resides on top of Hadoop to summarize Big Data, and makes querying and analyzing easy. Hive also provides a command line interface (CLI) to write SQL-like queries, using a language called the Hive Query Language (HQL). Hive is better suited for ad-hoc queries, but its main advantage is that it has an engine that stores and partitions data. But its tables can be read from Pig or Standard MapReduce.
Pig is an application that works on top of MapReduce, Yarn or Tez. Pig is written in Java and compiles Pig Latin scripts into to MapReduce jobs. Think of Pig as a compiler that takes Pig Latin scripts and transforms them into Java.
Pig is considered as an Apache project which provides a platform for analyzing large data sets on the Hadoop platform leveraging MapReduce. Pig’s infrastructure consists of a compiler that produces sequences of MapReduce programs. Pig has a 4th generation language and can run in both compiled and interactive mode.
It is a data flow language, the key focus of Pig is to manage the flow of data from the input source to output store. As part of managing this data flow it moves data feeding it to process1, taking output and feeding it to process2. The core features are preventing execution of subsequent stages if previous stage fails, manages temporary storage of data and most importantly compresses and rearranges processing steps for faster processing. While this can be done for any kind of processing tasks, Pig is written specifically for managing data flow of Map reduce type of jobs. Most, if not all jobs in a Pig are map reduce jobs or data movement jobs. Pig allows for custom functions to be added which can be used for processing in Pig, some default ones are like ordering, grouping, distinct, count etc.
Zookeeper is another Hadoop sub-project which provides a centralized service for maintaining configuration information. Including naming, distributed synchronization and name value pairs, Zookeeper provides the type of configuration functionality that are required by distributed applications.
For example, it makes it easier to:
- Manage configuration across nodes.
- Implement reliable messaging.
- Implement redundant services.
- Synchronize process execution.
Chukwa provides open source monitoring for distributed systems. Chukwa is a powerful toolkit for collecting and analyzing different types of application logs.
Chukwa aims to provide a flexible and powerful platform for distributed data collection and rapid data processing. Our goal is to produce a system that’s usable today, but that can be modified to take advantage of newer storage technologies (HDFS appends, HBase, etc) as they mature. In order to maintain this flexibility, Chukwa is structured as a pipeline of collection and processing stages, with clean and narrow interfaces between stages. This will facilitate future innovation without breaking existing code.
Chukwa has four primary components:
- Agents that run on each machine and emit data.
- Collectors that receive data from the agent and write it to stable storage.
- MapReduce jobs for parsing and archiving the data.
- HICC, the Hadoop Infrastructure Care Center; a web-portal style interface for displaying data.
Avro is a data serialization system which provides a compact, fast binary format, a container file to store persistent data, RPC functionality, and system integration with dynamic languages.
Avro provides a convenient way to represent complex data structures within a Hadoop MapReduce job. Avro data can be used as both input to and output from a MapReduce job, as well as the intermediate format.