I also wanted to learn Scala, I think it's interesting to be able to use functional programming from Java.
So I thought: mmmm if the map part of a mapreduce is basically applying a function (map) to some data... then a functional code should be more suitable for doing that task in certain situations, and more important: it should be more readable.
=======================================================================
Update: Shadoop is basically a Scala Object that wraps your int, bool, string to the Hadoop wrappers. This way you can write
val one = 1 instead of new IntWritable(1);
=======================================================================
So without further introduction, let's go to the example.
1- Get the Hadoop WordCount example source code
You'll see that they define an inner class
public static class Map extends MapReduceBase implements Mapper
That's the map function that I want to get rid off, in order to do that I made a scala class (I could have used a scala object and keep the Map class, I'm not sure what's best in terms of performance).
class WordCountScala extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
val one = new IntWritable(1);
val word = new Text();
def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
var line = value.toString();
line.split(" ").foreach(a => {word.set(a);output.collect(word, one);});
}
}
Ok, it's not a clean example of the advantages of functional programming but the idea is to show that you can use scala with Hadoop ;)val one = new IntWritable(1);
val word = new Text();
def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
var line = value.toString();
line.split(" ").foreach(a => {word.set(a);output.collect(word, one);});
}
}
The code is quite simple for every word in the value string (a word is defined here in the same way as in the sample code from apache: any string of chars without blanks) we add 1 to the count.
You can go back to the WordCount.java from the example, remove the Map inner class and set the mapper class to WordCountScala
The code will look like this:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCountScala.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCountScala.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Time to compile this thing. I use "The Scala IDE for Eclipse", but if you want to build it from console Assuming that you have the code for both classes in $HADOOP_HOME/testScala folder:
1- compile scala:
scalac -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar WordCountScala.scala
2-compile java:
javac -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar:$SCALA_HOME/lib/scala-library.jar:. WordCount.java
3-run:
java -classpath ../lib/commons-logging-1.1.1.jar:../hadoop-common-0.21.0.jar:../hadoop-mapred-0.21.0.jar:../lib/avro-1.3.2.jar:../lib/log4j-1.2.15.jar:../lib/jackson-core-asl-1.4.2.jar:../lib/jackson-mapper-asl-1.4.2.jar:../lib/commons-httpclient-3.1.jar:$SCALA_HOME/lib/scala-library.jar:. WordCount ../input/ ../output/
4-Check the output:
cat ../output/*
I have added a new project in github that has an example of wordcount in scala using new hadoop apis. https://github.com/smishra/scala-hadoop
ReplyDelete