Sunday, August 3, 2014

Hadoop WordCount Program with MapReduce v2 (MRv2)



Wordcount is the typical example to understand simple map-reduce functionality. This program reads the text file and counts how often each word occur in the input text file.


Simple Map Reduce Contains
Input and output directory: The input dir is text files and the output dir is text files, each line of which contains a word and the count of how often it occurred, separated by a tab.

Driver Class:  This driver class is responsible for triggering the map reduce job in Hadoop, it is in this driver class we provide the name of our job, output key value data types and the mapper and reducer classes

Mapper: Each mapper takes a line as input and breaks it into words. It then emits a key/value pair of the word and 1.

Reducer: Each reducer sums the counts for each word and emits a single key/value with the word and sum.
As an optimization, the reducer is also used as a combiner on the map outputs. This reduces the amount of data sent across the network by combining each word into a single record.  

Source Code:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
   
    //Mapper
    public static class Map extends
            Mapper<LongWritable, Text, Text, IntWritable> {
       
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
           
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
           
        }
    }

   // Reducer
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable t : values) {
sum+=t.get();
}

context.write(key, new IntWritable(sum));
}
}

    //Driver class
    public static void main(String[] args) throws Exception {
           
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WordCount");
       
        job.setJarByClass(WordCount.class);
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //Mapper   
        job.setMapperClass(Map.class);
        //Reducer
        job.setReducerClass(Reduce.class);
       
        //Input Text Format
        job.setInputFormatClass(TextInputFormat.class);
        //Output Text Format
        job.setOutputFormatClass(TextOutputFormat.class);
        //Input directory
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //Output directory
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
   
    }
}

Execute the Program:
Assuming HADOOP_HOME is the root of the installation and HADOOP_VERSION is the Hadoop version installed, compile WordCount.java and create a jar:
  1. create the WordCount.jar and mention Wordcount.class in the class file.
  2. Assuming that:
    •  /usr/wordcount/input - input directory in HDFS
    • /usr/wordcount/output - output directory in HDFS
  3.  $ bin/hadoop dfs -ls /usr/wordcount/input/ contains sample.txt as "Hello World Bye World Hello Hadoop Goodbye Hadoop"
  4. Run the application:
    $ bin/hadoop jar /usr/wordcount.jar WordCount /usr/wordcount/input /usr/wordcount/output
  5. Output:
    $ bin/hadoop dfs -cat /usr/wordcount/output/part-00000
    Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2

1 comment: