MapReduce
Introduction
MapReduce is a programming model and processing technique introduced by Google in 2004 to enable large-scale data processing on distributed systems. Hadoop’s implementation of MapReduce abstracts the complexities of parallel execution, data distribution, and fault tolerance, allowing developers to focus on writing map and reduce functions. This guide dives deep into every aspect of MapReduce—architecture, APIs, data flow, advanced customization, debugging, optimization, and real-world case studies—to provide a holistic understanding.
Historical Context
Before MapReduce, batch processing of massive datasets used specialized, expensive hardware or complex MPI-based applications. Google’s seminal paper outlined a simpler model using commodity hardware. Hadoop, created by Doug Cutting and Mike Cafarella in 2006, brought MapReduce to the open-source world. Over the years, MapReduce evolved into a core component of the Hadoop ecosystem, spawning new engines like Spark and Tez that optimize specific workloads but remain conceptually similar.
Architecture and Components
Client: Submits a job configuration to the JobTracker (Hadoop 1) or ResourceManager (Hadoop 2/YARN).
JobTracker / ResourceManager: Centralized service that negotiates resources and tracks job progress.
TaskTracker / NodeManager: Worker processes that launch and monitor map and reduce tasks.
ApplicationMaster: In YARN, a per-application master coordinates resource requests and task scheduling.
InputFormat: Defines how input files are split and read into key/value pairs. Examples: TextInputFormat, KeyValueInputFormat, SequenceFileInputFormat.
RecordReader: Parses raw byte streams from InputSplits into records for mappers.
Partitioner: Dictates how keys are distributed to reducers (default: HashPartitioner).
Shuffle: Transfers intermediate data from mappers to reducers, implemented via HTTP fetches.
Combiner: Optional mini-reducer run on map output to reduce data volume.
OutputFormat: Defines how reducer outputs are written (default: TextOutputFormat).
Data Flow
1. Split: Hadoop splits input files into InputSplits aligned with HDFS blocks.
2. Map: Each split is processed by a map task, emitting intermediate key/value pairs.
3. Spill: When in-memory buffers fill, map outputs are spilled to local disk, optionally combining with a combiner.
4. Partition & Sort: Intermediate data is partitioned by key and sorted within each partition.
5. Shuffle: Reducers pull the partitioned map outputs over the network.
6. Reduce: Reducers iterate over sorted keys and their associated values to produce final results.
7. Output: Final key/value pairs are written to HDFS by the OutputFormat.
MapReduce APIs
Old API: org.apache.hadoop.mapred (JobConf, Mapper, Reducer)
New API: org.apache.hadoop.mapreduce (Job, Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>, Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).
New API supports configuration via context, better type safety, and clearer method signatures.
Example: Java Word Count
The Java Word Count example demonstrates a complete MapReduce application. It includes three main components: the TokenizerMapper class that emits each word with a count of one, the IntSumReducer class that sums counts per word, and the main method that configures and launches the job.
TokenizerMapper: Reads each line of input as Text
, splits it into tokens using StringTokenizer
, and writes each word paired with an IntWritable(1)
. Context.write sends the pair to the framework.
IntSumReducer: Receives a word and iterable counts, accumulates the total count in a local integer, sets it into an IntWritable
, and emits the final (word, total) pair.
Job Setup: The main
method initializes a Job
instance, sets classes for mapper, combiner, reducer, and output key/value types, specifies input/output paths, and calls job.waitForCompletion(true)
to block until completion.
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) sum += val.get();
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Explanation of Java Example Above
This example processes text files stored in HDFS. Each mapper task handles one split of the file, producing intermediate (word, 1) pairs. Combiner runs locally on each mapper’s output to sum partial counts. Reducers receive all partial counts for a word, compute the final sum, and write results to the output directory in HDFS. Running hadoop jar
with input/output paths executes the job in the cluster.
Python Streaming Example
Hadoop Streaming enables writing MapReduce jobs in languages like Python by specifying mapper and reducer scripts. Below, mapper.py
reads lines from STDIN and emits each word with a count of 1. reducer.py
aggregates these counts by word, emitting final tallies.
#!/usr/bin/env python3 # mapper.py
import sys
for line in sys.stdin:
for word in line.strip().split():
# Emit each word with a count of 1\ n print(f"{word} 1")
#!/usr/bin/env python3 # reducer.py
import sys
current_word = None
current_count = 0
for line in sys.stdin:
word, count = line.strip().split(' ')
count = int(count)
if word == current_word:
current_count += count
else:
if current_word is not None:
print(f"{current_word} {current_count}")
current_word = word
current_count = count
# Emit last word count
if current_word:
print(f"{current_word} {current_count}")
# Run Streaming job with Python scripts
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input /input \
-output /output_streaming \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
Explanation of Streaming Example Above
The hadoop jar
command launches the streaming jar. The -mapper
and -reducer
options specify the scripts. The -file
flags ensure the scripts are shipped to each task node. During execution, Hadoop handles splitting input, scheduling map and reduce tasks, shuffling intermediate output, and writing final results to HDFS.
Advanced Features
Custom InputFormat: Process non-text data or user-defined formats (e.g., XML, JSON).
Custom Partitioner: Control data distribution across reducers (e.g., range partitioner).
Combiner: Reduces volume of intermediate data; should be associative and commutative.
Counters: Instrument code to emit metrics; viewable in job UI and logs.
Speculative Execution: Run duplicate tasks on straggling nodes; can be disabled for network-intensive jobs.
Local Mode vs Cluster Mode: Local mode runs in a single JVM for testing; cluster mode runs on YARN for production.
Configuration and Tuning
Tune parameters in mapred-site.xml
and yarn-site.xml
:
mapreduce.job.reduces
: Number of reduce tasks; balance between parallelism and overhead.
mapreduce.task.io.sort.mb
: Memory buffer size for sorting map outputs.
yarn.nodemanager.resource.memory-mb
: Memory available for containers.
mapreduce.map.memory.mb
/ mapreduce.reduce.memory.mb
: Container memory settings.
mapreduce.task.timeout
: Timeout for hung tasks; adjust for heavy computation.
Debugging and Monitoring
NameNode/ResourceManager UI: View job and cluster metrics at default ports 9870/8088.
log4j: Configure logging levels for MapReduce and YARN components in conf/log4j.properties
.
yarn logs: Retrieve aggregated logs with yarn logs -applicationId <appId>
.
MRUnit: Unit testing framework for MapReduce jobs.
Real-World Use Cases
Data Analytics: PageRank, user behavior analysis, A/B testing.
ETL Pipelines: Log parsing, data cleansing, enrichment.
Indexing: Building inverted indexes for search engines.
Machine Learning: Large-scale feature extraction, model training (e.g., Mahout).
Next Steps
Explore alternative execution engines (Spark, Tez) for iterative and interactive workloads, integrate MapReduce with Apache Hive and Pig for declarative development, and consider migrating to newer paradigms such as Apache Flink for streaming data.