mapreduce
30分プログラム、その182。OCamlでmapreduce。
OCamlのチャネル(Event.channel)を覚えたので、さっそく使ってmapreduceを実装してみよう。正直、これであってるのかあまり自信がない。
自分はmapreduceを次のように理解している。
- mapreduce f g init xsという形で使う
- fはf c xという風に使われる
- cはチャネルでxはxsの任意の要素
- xから何かを計算してその結果をcに送信する
- 送信するのは(key,value)という形
- これがmapに相当する
- gはg key [value] initのように使われる
- mapの結果がkeyごとにまとめられている。そしてそれをfoldするのに使われる
- これがreduce(fold)に相当する
- 最終的にgでreduceした値が帰る
ちなみにスレッドを使ったプログラムは、次のようにコンパイルする。
$ ocamlc -thread unix.cma threads.cma mapreduce.ml
Unixモジュールと-threadオプションがポイント。
使い方
Programming Erlangのように、複数ファイルのwcっぽいのを書いてみた。
読み込み部分でいんちきしているけれど。
(* map部分。nというファイルから単語を読み取って、cに送信する。 *) let f c n = (* 偽のread *) let file = match n mod 3 with 0 -> ["This";"is";"pen";"That";"is";"apple"] | 1 -> ["less";"is";"more"] | _ -> ["Hello";"World"] in List.iter (fun word -> sync @@ send c (Some (word,1))) file; sync @@ send c None (* reduce部分。key(単語)ごとに長さを計算する *) let g key value init = (List.length(value),key)::init (* mapreduceの結果をソートして、整形して表示する *) let res = mapreduce f g [] [0;1;2;3;4] in let sort = List.sort (fun (a,_) (b,_) -> compare a b ) in List.iter (fun (x,y)-> Printf.printf "(%d,%s)\n" x y) (List.rev (sort res))
ソースコード
(* ocamlc -thread unix.cma threads.cma mapreduce.ml *) open Event let (@@) f g = f g let ($) f g x = f (g x) (* xsのすべての要素にfを適用する 。 そして、その結果のテーブルを返す *) let map f xs = let c = new_channel () in let apply x = f c x in let _ = List.iter (fun x-> ignore @@ Thread.create apply x) xs in let n = List.length xs in let table = Hashtbl.create n in let rec collect = function | 0 -> table | n -> match sync @@ receive c with | Some (key,value) -> (if Hashtbl.mem table key then Hashtbl.replace table key @@ value::(Hashtbl.find table key) else Hashtbl.add table key [value]); collect n | None -> collect (n-1) in collect n let mapreduce f g init xs = let table = map f xs in Hashtbl.fold g table init let _ = let f c n = (* n番目のファイルに含まれる単語だと思ってください *) let file = match n mod 3 with 0 -> ["This";"is";"pen";"That";"is";"apple"] | 1 -> ["less";"is";"more"] | _ -> ["Hello";"World"] in List.iter (fun word -> sync @@ send c (Some (word,1))) file; sync @@ send c None in let g key value init = (List.length(value),key)::init in let res = mapreduce f g [] [0;1;2;3;4] in let sort = List.sort (fun (a,_) (b,_) -> compare a b ) in List.iter (fun (x,y)-> Printf.printf "(%d,%s)\n" x y) (List.rev (sort res))