ScalaでHadoopを使ってみた。

大名古屋ことGoogle グループの宿題で「Hadoopを動かしてみること」というのがあったので、試してみました。Javaを使う気にはならないので、Scalaで。

コード

Hadoopの2章のコードを素直に写経する。一ファイル一クラスの制限がないのはいいね。

import java.util.Iterator
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._

class MapExample extends MapReduceBase with Mapper[LongWritable,Text,Text,IntWritable] {
  def map(key      : LongWritable,
	  value    : Text,
	  output   : OutputCollector[Text,IntWritable],
	  reporter : Reporter ) {
    val xs = value.toString.split(":")
    val year = xs(0)
    val temp = xs(1).toInt

    output.collect(new Text(year),new IntWritable(temp))
  }
}

class ReduceExample extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
  implicit def j2s(value: java.util.Iterator[IntWritable]) : scala.Iterator[Int] =
    new scala.Iterator[Int] {
      def hasNext = value.hasNext
      def next = value.next.get
    }

  def reduce(key      : Text,
	     values   : Iterator[IntWritable],
	     output   : OutputCollector[Text,IntWritable],
	     reporter : Reporter) {
    val max : Int = values.reduceLeft(Math.max(_,_))
    output.collect(key, new IntWritable(max))
  }
}

object HadoopExample {
  def main(args : Array[String]){
    if(args.length != 2){
      System.err.println("Usage: max <input path> <output path>")
      System.exit(-1)
    }

    val conf : JobConf = new JobConf( Class.forName("HadoopExample") )
    conf.setJobName("max")

    FileInputFormat .addInputPath ( conf, new Path(args(0)) )
    FileOutputFormat.setOutputPath( conf, new Path(args(1)) )

    conf.setMapperClass(classOf[MapExample])
    conf.setReducerClass(classOf[ReduceExample])

    conf.setOutputKeyClass(classOf[Text])
    conf.setOutputValueClass(classOf[IntWritable])

    JobClient.runJob(conf)
  }
}

サンプルファイル

入力ファイルは、Hadoop本のやつを単純化しました。年とその年の気温のペアのつもり。

2010:30
2010:31
2010:32
2010:33
2010:34
2010:30
2009:25
2009:24
2009:23
2009:26
2008:10

コンパイル

#!/bin/sh
HADOOP_HOME=/opt/manual/hadoop
rm *.jar
mkdir -p classes

scalac -classpath ${HADOOP_HOME}/lib/commons-logging-1.0.4.jar:${HADOOP_HOME}/lib/commons-cli-1.2.jar:$HADOOP_HOME/hadoop-0.20.2-core.jar -d classes hadoop_example.scala
jar -cvf hadoop-example.jar -C classes .

実行

scala-object.jarを$HADOOP_HOME/libにコピーしてやる。

#!/bin/sh
rm -rf output
hadoop jar ./hadoop-example.jar HadoopExample sample.txt output
cat output/*