Wordcount is the typical example to understand simple map-reduce functionality. This program reads the text file and counts how often each word occur in the input text file.
Simple Map Reduce Contains
Driver Class: This driver class is responsible for triggering the map reduce job in Hadoop, it is in this driver class we provide the name of our job, output key value data types and the mapper and reducer classes
Mapper: Each mapper takes a line as input and breaks it into words. It then emits a key/value pair of the word and 1.
Reducer: Each reducer sums the counts for each word and emits a single key/value with the word and sum.
As an optimization, the reducer is also used as a combiner on the map outputs. This reduces the amount of data sent across the network by combining each word into a single record.
Source Code:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
//Mapper
public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
// Reducer
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable t : values) {
sum+=t.get();
}
context.write(key, new IntWritable(sum));
}
}
//Driver class
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "WordCount");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Mapper
job.setMapperClass(Map.class);
//Reducer
job.setReducerClass(Reduce.class);
//Input Text Format
job.setInputFormatClass(TextInputFormat.class);
//Output Text Format
job.setOutputFormatClass(TextOutputFormat.class);
//Input directory
FileInputFormat.addInputPath(job, new Path(args[0]));
//Output directory
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Execute the Program:
Assuming HADOOP_HOME is the root of the installation and HADOOP_VERSION is the Hadoop version installed, compile WordCount.java and create a jar:- create the WordCount.jar and mention Wordcount.class in the class file.
- Assuming that:
- /usr/wordcount/input - input directory in HDFS
- /usr/wordcount/output - output directory in HDFS
- $ bin/hadoop dfs -ls /usr/wordcount/input/ contains sample.txt as "Hello World Bye World Hello Hadoop Goodbye Hadoop"
- Run the application:
$ bin/hadoop jar /usr/wordcount.jar WordCount /usr/wordcount/input /usr/wordcount/output - Output:
$ bin/hadoop dfs -cat /usr/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Nice post
ReplyDelete