MapReduce

Introduction

MapReduce is a programming model and processing technique introduced by Google in 2004 to enable large-scale data processing on distributed systems. Hadoop’s implementation of MapReduce abstracts the complexities of parallel execution, data distribution, and fault tolerance, allowing developers to focus on writing map and reduce functions. This guide dives deep into every aspect of MapReduce—architecture, APIs, data flow, advanced customization, debugging, optimization, and real-world case studies—to provide a holistic understanding.

Historical Context

Before MapReduce, batch processing of massive datasets used specialized, expensive hardware or complex MPI-based applications. Google’s seminal paper outlined a simpler model using commodity hardware. Hadoop, created by Doug Cutting and Mike Cafarella in 2006, brought MapReduce to the open-source world. Over the years, MapReduce evolved into a core component of the Hadoop ecosystem, spawning new engines like Spark and Tez that optimize specific workloads but remain conceptually similar.

Architecture and Components

Client: Submits a job configuration to the JobTracker (Hadoop 1) or ResourceManager (Hadoop 2/YARN).
JobTracker / ResourceManager: Centralized service that negotiates resources and tracks job progress.
TaskTracker / NodeManager: Worker processes that launch and monitor map and reduce tasks.
ApplicationMaster: In YARN, a per-application master coordinates resource requests and task scheduling.
InputFormat: Defines how input files are split and read into key/value pairs. Examples: TextInputFormat, KeyValueInputFormat, SequenceFileInputFormat.
RecordReader: Parses raw byte streams from InputSplits into records for mappers.
Partitioner: Dictates how keys are distributed to reducers (default: HashPartitioner).
Shuffle: Transfers intermediate data from mappers to reducers, implemented via HTTP fetches.
Combiner: Optional mini-reducer run on map output to reduce data volume.
OutputFormat: Defines how reducer outputs are written (default: TextOutputFormat).

Data Flow

1. Split: Hadoop splits input files into InputSplits aligned with HDFS blocks.
2. Map: Each split is processed by a map task, emitting intermediate key/value pairs.
3. Spill: When in-memory buffers fill, map outputs are spilled to local disk, optionally combining with a combiner.
4. Partition & Sort: Intermediate data is partitioned by key and sorted within each partition.
5. Shuffle: Reducers pull the partitioned map outputs over the network.
6. Reduce: Reducers iterate over sorted keys and their associated values to produce final results.
7. Output: Final key/value pairs are written to HDFS by the OutputFormat.

MapReduce APIs

Old API: org.apache.hadoop.mapred (JobConf, Mapper, Reducer)
New API: org.apache.hadoop.mapreduce (Job, Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>, Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).
New API supports configuration via context, better type safety, and clearer method signatures.

Example: Java Word Count

The Java Word Count example demonstrates a complete MapReduce application. It includes three main components: the TokenizerMapper class that emits each word with a count of one, the IntSumReducer class that sums counts per word, and the main method that configures and launches the job.

TokenizerMapper: Reads each line of input as Text, splits it into tokens using StringTokenizer, and writes each word paired with an IntWritable(1). Context.write sends the pair to the framework.
IntSumReducer: Receives a word and iterable counts, accumulates the total count in a local integer, sets it into an IntWritable, and emits the final (word, total) pair.
Job Setup: The main method initializes a Job instance, sets classes for mapper, combiner, reducer, and output key/value types, specifies input/output paths, and calls job.waitForCompletion(true) to block until completion.

public class WordCount {
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) sum += val.get();
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Explanation of Java Example Above

This example processes text files stored in HDFS. Each mapper task handles one split of the file, producing intermediate (word, 1) pairs. Combiner runs locally on each mapper’s output to sum partial counts. Reducers receive all partial counts for a word, compute the final sum, and write results to the output directory in HDFS. Running hadoop jar with input/output paths executes the job in the cluster.

Python Streaming Example

Hadoop Streaming enables writing MapReduce jobs in languages like Python by specifying mapper and reducer scripts. Below, mapper.py reads lines from STDIN and emits each word with a count of 1. reducer.py aggregates these counts by word, emitting final tallies.

#!/usr/bin/env python3  # mapper.py
import sys
for line in sys.stdin:
    for word in line.strip().split():
        # Emit each word with a count of 1\ n        print(f"{word}	1")

#!/usr/bin/env python3  # reducer.py
import sys
current_word = None
current_count = 0
for line in sys.stdin:
    word, count = line.strip().split('	')
    count = int(count)
    if word == current_word:
        current_count += count
    else:
        if current_word is not None:
            print(f"{current_word}	{current_count}")
        current_word = word
        current_count = count
# Emit last word count
if current_word:
    print(f"{current_word}	{current_count}")

# Run Streaming job with Python scripts
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input /input \
  -output /output_streaming \
  -mapper mapper.py \
  -reducer reducer.py \
  -file mapper.py \
  -file reducer.py

Explanation of Streaming Example Above

The hadoop jar command launches the streaming jar. The -mapper and -reducer options specify the scripts. The -file flags ensure the scripts are shipped to each task node. During execution, Hadoop handles splitting input, scheduling map and reduce tasks, shuffling intermediate output, and writing final results to HDFS.

Advanced Features

Custom InputFormat: Process non-text data or user-defined formats (e.g., XML, JSON).
Custom Partitioner: Control data distribution across reducers (e.g., range partitioner).
Combiner: Reduces volume of intermediate data; should be associative and commutative.
Counters: Instrument code to emit metrics; viewable in job UI and logs.
Speculative Execution: Run duplicate tasks on straggling nodes; can be disabled for network-intensive jobs.
Local Mode vs Cluster Mode: Local mode runs in a single JVM for testing; cluster mode runs on YARN for production.

Configuration and Tuning

Tune parameters in mapred-site.xml and yarn-site.xml:
mapreduce.job.reduces: Number of reduce tasks; balance between parallelism and overhead.
mapreduce.task.io.sort.mb: Memory buffer size for sorting map outputs.
yarn.nodemanager.resource.memory-mb: Memory available for containers.
mapreduce.map.memory.mb / mapreduce.reduce.memory.mb: Container memory settings.
mapreduce.task.timeout: Timeout for hung tasks; adjust for heavy computation.

Debugging and Monitoring

NameNode/ResourceManager UI: View job and cluster metrics at default ports 9870/8088.
log4j: Configure logging levels for MapReduce and YARN components in conf/log4j.properties.
yarn logs: Retrieve aggregated logs with yarn logs -applicationId <appId>.
MRUnit: Unit testing framework for MapReduce jobs.

Real-World Use Cases

Data Analytics: PageRank, user behavior analysis, A/B testing.
ETL Pipelines: Log parsing, data cleansing, enrichment.
Indexing: Building inverted indexes for search engines.
Machine Learning: Large-scale feature extraction, model training (e.g., Mahout).

Next Steps

Explore alternative execution engines (Spark, Tez) for iterative and interactive workloads, integrate MapReduce with Apache Hive and Pig for declarative development, and consider migrating to newer paradigms such as Apache Flink for streaming data.

Previous: HDFS | Next: YARN

<
>