mapreduce

30分プログラム、その182。OCamlmapreduce
OCamlのチャネル(Event.channel)を覚えたので、さっそく使ってmapreduceを実装してみよう。正直、これであってるのかあまり自信がない。

自分はmapreduceを次のように理解している。

  • mapreduce f g init xsという形で使う
  • fはf c xという風に使われる
    • cはチャネルでxはxsの任意の要素
    • xから何かを計算してその結果をcに送信する
    • 送信するのは(key,value)という形
    • これがmapに相当する
  • gはg key [value] initのように使われる
    • mapの結果がkeyごとにまとめられている。そしてそれをfoldするのに使われる
    • これがreduce(fold)に相当する
  • 最終的にgでreduceした値が帰る

ちなみにスレッドを使ったプログラムは、次のようにコンパイルする。

$ ocamlc -thread unix.cma threads.cma mapreduce.ml

Unixモジュールと-threadオプションがポイント。

使い方

Programming Erlangのように、複数ファイルのwcっぽいのを書いてみた。
読み込み部分でいんちきしているけれど。

(* map部分。nというファイルから単語を読み取って、cに送信する。 *)
let f c n = 
  (* 偽のread *)
  let file = match n mod 3 with
              0 -> ["This";"is";"pen";"That";"is";"apple"]
            | 1 -> ["less";"is";"more"]
            | _ -> ["Hello";"World"] in
    List.iter (fun word -> sync @@ send c (Some (word,1))) file;
    sync @@ send c None

(* reduce部分。key(単語)ごとに長さを計算する *)
let g key value init = 
  (List.length(value),key)::init

(* mapreduceの結果をソートして、整形して表示する *)
let res = mapreduce f g [] [0;1;2;3;4] in
let sort = List.sort (fun (a,_) (b,_) -> compare a b ) in
 List.iter 
    (fun (x,y)-> Printf.printf "(%d,%s)\n" x y) 
    (List.rev (sort res))

ソースコード

(* ocamlc -thread unix.cma threads.cma mapreduce.ml *)
open Event

let (@@) f g = f g
let ($) f g x = f (g x)

(* 
   xsのすべての要素にfを適用する 。
   そして、その結果のテーブルを返す
*)
let map f xs =
  let c = new_channel () in
  let apply x =  f c x in
  let _ = List.iter (fun x-> ignore @@ Thread.create apply x) xs in
  let n = List.length xs in
  let table = Hashtbl.create n in
  let rec collect = 
    function
      | 0 -> table
      | n -> 
	  match sync @@ receive c with
	    | Some (key,value) ->
		(if Hashtbl.mem table key then
		  Hashtbl.replace table key @@ value::(Hashtbl.find table key)
		else
		  Hashtbl.add table key [value]);
		collect n
	    | None -> collect (n-1) in
    collect n

let mapreduce f g init xs = 
  let table = map f xs in
    Hashtbl.fold g table init

let _ = 
  let f c n = 
    (* n番目のファイルに含まれる単語だと思ってください *)
    let file = match n mod 3 with
	0 -> ["This";"is";"pen";"That";"is";"apple"]
      | 1 -> ["less";"is";"more"]
      | _ -> ["Hello";"World"] in
      List.iter (fun word -> sync @@ send c (Some (word,1))) file;
      sync @@ send c None
  in
  let g key value init = 
    (List.length(value),key)::init in
  let res = mapreduce f g [] [0;1;2;3;4] in
  let sort = List.sort (fun (a,_) (b,_) -> compare a b ) in
    List.iter 
      (fun (x,y)-> Printf.printf "(%d,%s)\n" x y) 
      (List.rev (sort res))