Wednesday, 31 August 2011

Hadoop with Scala WordCount Example

I had some "free" time in the office so I decided to start researching Hadoop. I had ideas on Hadoop concepts, read a couple of articles, but never worked with it (although I worked with similar technologies).
I also wanted to learn Scala, I think it's interesting to be able to use functional programming from Java.

So I thought: mmmm if the map part of a mapreduce is basically applying a function (map) to some data... then a functional code should be more suitable for doing that task in certain situations, and more important: it should be more readable.
=======================================================================
Update: Shadoop is basically a Scala Object that wraps your int, bool, string to the Hadoop wrappers. This way you can write
val one = 1 instead of new IntWritable(1);
=======================================================================
So without further introduction, let's go to the example.
1- Get the Hadoop WordCount example source code


You'll see that they define an inner class
public static class Map extends MapReduceBase implements Mapper

That's the map function that I want to get rid off, in order to do that I made a scala class (I could have used a scala object and keep the Map class, I'm not sure what's best in terms of performance).

class WordCountScala extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
  val one = new IntWritable(1);
  val word = new Text();


  def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
    var line = value.toString();
    line.split(" ").foreach(a => {word.set(a);output.collect(word, one);});
  }


}
Ok, it's not a clean example of the advantages of functional programming but the idea is to show that you can use scala with Hadoop ;)

The code is quite simple for every word in the value string  (a word is defined here in the same way as in the sample code from apache: any string of chars without blanks) we add 1 to the count.

You can go back to the WordCount.java from the example, remove the Map inner class and set the mapper class to WordCountScala

The code will look like this:
import java.io.IOException;
import java.util.Iterator;


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class WordCount {
   
    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }


    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");


        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);


        conf.setMapperClass(WordCountScala.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);


        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);


        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));


        JobClient.runJob(conf);
    }
}



Time to compile this thing. I use "The Scala IDE for Eclipse", but if you want to build it from console Assuming that you have the code for both classes in $HADOOP_HOME/testScala folder:

1- compile scala:
scalac -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar WordCountScala.scala

2-compile java:
javac -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar:$SCALA_HOME/lib/scala-library.jar:. WordCount.java

3-run:
java -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar:../lib/jackson-core-asl-1.4.2.jar:../lib/jackson-mapper-asl-1.4.2.jar:../lib/commons-httpclient-3.1.jar:$SCALA_HOME/lib/scala-library.jar:. WordCount ../input/ ../output/

4-Check the output:
cat ../output/*

1 comment:

  1. I have added a new project in github that has an example of wordcount in scala using new hadoop apis. https://github.com/smishra/scala-hadoop

    ReplyDelete