6.5840 実験一 —— MapReduce
はじめに
実験一は MapReduce システムの実装が課題で、基本的には master プログラムと worker プログラムの二つの部分から成り立っています。この実験はかなり難易度が高く、まず golang の RPC と並行処理の使い方に慣れている必要があり、さらに MapReduce の全体的な流れと仕組みをよく理解していることが求められます。実は小さなコツがあり、それは論文中のこの図をひたすら眺め、下のフロー説明を繰り返し読むことです:

この実験では二つのバージョンを実装しました。主に並行制御の方法が異なります。最初は mutex ロックベースのバージョンで、後に channel を使ったロックフリーのバージョンにリファクタリングしました。ロックフリー版の実装はよりエレガントなので、説明も主にこちらを基にしています。
実験の説明
実験を始める前に、まずは実験内容を理解する必要があります。説明書はこちらです:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。この実験は Linux 環境で行う必要があります。なぜならプロセス間通信は unix socket を使っているためです。MacOS でも原理的には可能ですが、いくつか小さな問題があると言われています。
コードにはすでに単一スレッドの直列版 MapReduce が用意されており、src/main/mrsequential.go
にあります。このバージョンは非常に重要で、まず一度読んで全体の流れを大まかに把握することをお勧めします。いくつかの処理はここからそのままコピーしても問題ありません。
並行版の master プログラムのエントリポイントは main/mrcoordinator.go
、worker プログラムのエントリポイントは main/mrworker.go
にあります。実装すべきファイルは三つで、mr/coordinator.go
、mr/worker.go
、mr/rpc.go
です。これらはそれぞれ master の処理コード、worker の処理コード、そして両者間の通信に使う RPC 構造を表しています。
mrcoordinator は mr/coordinator.go
内の MakeCoordinator 関数を呼び出して master の構造体を構築し、ソケットのリスニングを開始します。戻った後、メインゴルーチンは繰り返し Coordinator.Done メソッドを呼び出し、MapReduce タスクが完了したかをチェックし、完了を確認してからメインゴルーチンを終了します。したがって、MakeCoordinator 内で関数の戻りを妨げる操作は避けるべきで、リスニングなどの処理は新しいゴルーチンで行う必要があります。
mrworker の処理は非常にシンプルで、メインゴルーチン一つだけで mr/worker.go
の Worker 関数を直接呼び出して処理します。基本的には単一ゴルーチンのプログラムとして実装できます。
テストスクリプトは src/main/test-mr.sh
にあり、これは既存の二つの MapReduce プログラム、wc と indexer をあなたのフレームワークで実行し、直列実行の結果と比較します。また、同じ Map または Reduce タスクを並行で実行した場合や、worker がタスク実行中にクラッシュした場合でも、最終的に正しい結果が得られるかを検証します。通常は master プロセス一つと worker プロセス三つを起動します。実行中にエラーが発生して終了しない場合は、ps -A
コマンドで mrcoordinator プロセスの PID を探し、kill で終了させてください。普通の ctrl + c
では完全に終了しないことがあり、後続のテストに影響を与えます。
最後に、実験指導書を何度も読み返すことを強くお勧めします。
実装の考え方
全体の流れ
worker はまず map タスクを完了し、多数の中間ファイル “mr-X-Y” を生成します。ここで X は map タスクの ID、Y は対応する reduce タスクの ID です。次に reduce は Y が自身の reduce タスク ID と等しいすべてのファイルを集め、読み込んで reduce 処理を行い、結果を “mr-out-Y” に出力します。
master の実装
ロックフリーの考え方
ロックフリー実装のため、複数ゴルーチン間のデータ競合を避けるために、主要なデータ構造への操作は一つのゴルーチンに集約します。これをスケジューラゴルーチンと呼びます。worker が RPC で master にタスク取得や完了報告を要求すると、master は自動生成された RPC ゴルーチンでリクエストを処理しますが、主要データ構造への操作はすべてスケジューラゴルーチンに channel 経由で依頼します。これによりデータ競合を防ぎます。worker と master 間のメッセージは複数種類あるため、スケジューラは複数の channel を同時に管理する必要があり、golang の select 構文が活用されます。
// この goroutine のみで構造体を操作
func (c *Coordinator) schedule() {
for {
select {
case msg := <-c.getTaskChan:
c.getTaskHandler(msg)
case msg := <-c.doneTaskChan:
c.doneTaskHandler(msg)
case msg := <-c.timeoutChan:
c.timeoutHandler(msg)
case msg := <-c.doneCheckChan:
c.doneCheckHandler(msg)
}
}
}
例えば worker がタスクを取得したい場合、master の GetTask は以下のように処理します:
func (c *Coordinator) GetTask(_ *GetTaskReq, resp *GetTaskResp) error {
msg := GetTaskMsg{
resp: resp,
ok: make(chan struct{}),
}
c.getTaskChan <- msg
<-msg.ok
return nil
}
getTaskChan に送るのは resp(GetTask はリクエストパラメータなし)だけでなく、chan struct{} 型のチャネルも渡します。これはスケジューラゴルーチンが処理完了を RPC ゴルーチンに通知するためのもので、処理完了時に msg.ok に struct{} を送ることで RPC ゴルーチンが戻ります。
Coordinator の構造体
Coordinator の全体構造は以下の通りです:
type Coordinator struct {
nMap int
nReduce int
phase TaskPhase
allDone bool
taskTimeOut map[int]time.Time
tasks []*Task
getTaskChan chan GetTaskMsg
doneTaskChan chan DoneTaskMsg
doneCheckChan chan DoneCheckMsg
timeoutChan chan TimeoutMsg
}
phase は現在のタスク実行フェーズを示します。reduce タスクはすべての map タスク完了後に実行されるため、TaskPhase は Map と Reduce の二段階に分かれています。各フェーズで tasks スライスには該当フェーズのタスクのみが入ります。
taskTimeOut は現在実行中のタスクの開始時刻を記録し、別ゴルーチンが定期的にこの map をスキャンして、10 秒以上経過したタスク(タイムアウト)を検出し、該当タスクの状態を未開始に戻して再スケジューリング可能にします。このスキャンもスケジューラゴルーチン経由で行います。タイムアウト map には現在のフェーズのタスクのみが含まれ、フェーズ切り替え時にクリアされます。
tasks スライスは現在フェーズのすべての Task とその状態を保持します:
type ReduceTask struct {
NMap int
}
type MapTask struct {
FileName string
NReduce int
}
type TaskStatus int
var (
TaskStatus_Idle TaskStatus = 0
TaskStatus_Running TaskStatus = 1
TaskStatus_Finished TaskStatus = 2
)
type Task struct {
TaskId int
MapTask MapTask
ReduceTask ReduceTask
TaskStatus TaskStatus
}
ここではタスク状態を三つに分類しています:未実行、実行中、完了済み。また MapTask と ReduceTask の両方を冗長に保持し、現在のフェーズに応じて使い分けます。
具体的な処理
Coordinator の channel からわかるように、四種類の操作がスケジューラゴルーチンと通信されます。
worker がタスクを要求すると、返されるタスクタイプは四種類あります:
type TaskType int
var (
TaskType_Map TaskType = 0
TaskType_Reduce TaskType = 1
TaskType_Wait TaskType = 2
TaskType_Exit TaskType = 3
)
master はまず tasks を走査し、未実行のタスクを探し、現在のフェーズに応じて Map または Reduce タスクを返します。空きタスクがない場合は二つの状況に分かれます。Map フェーズなら TaskType_Wait を返して worker に待機を促し、Map フェーズ終了後に Reduce タスクが続きます。Reduce フェーズならすべてのタスクが完了しているため、TaskType_Exit を返して worker に終了を指示します。
worker がタスク完了を報告すると、master はタスクタイプと ID を受け取ります。master は現在のフェーズと異なるタスクは無視し、taskId に該当する tasks の状態を強制的に完了に変更し、timeout マップから該当エントリを削除します。
func (c *Coordinator) doneTaskHandler(msg DoneTaskMsg) {
req := msg.req
if req.TaskType == TaskType_Map && c.phase == TaskPhase_Reduce {
// 現フェーズと異なるタスクの報告は無視
msg.ok <- struct{}{}
return
}
for _, task := range c.tasks {
if task.TaskId == req.TaskId {
// 状態に関わらず完了に変更
task.TaskStatus = TaskStatus_Finished
break
}
}
// timeout マップから削除
delete(c.taskTimeOut, req.TaskId)
allDone := true
for _, task := range c.tasks {
if task.TaskStatus != TaskStatus_Finished {
allDone = false
break
}
}
if allDone {
if c.phase == TaskPhase_Map {
func (c *Coordinator) doneTaskHandler(msg DoneTaskMsg) {
req := msg.req
if req.TaskType == TaskType_Map && c.phase == TaskPhase_Reduce {
// 現フェーズと異なるタスクの報告は無視
msg.ok <- struct{}{}
return
}
for _, task := range c.tasks {
if task.TaskId == req.TaskId {
// 状態に関わらず完了に変更
task.TaskStatus = TaskStatus_Finished
break
}
}
// timeout マップから削除
delete(c.taskTimeOut, req.TaskId)
allDone := true
for _, task := range c.tasks {
if task.TaskStatus != TaskStatus_Finished {
allDone = false
break
}
}
if allDone {
if c.phase == TaskPhase_Map {
c.initReducePhase()
} else {
c.allDone = true
}
}
msg.ok <- struct{}{}
}
Reduce フェーズで全タスク完了を検出した場合は allDone フラグを立てます。
Coordinator 初期化時に別ゴルーチンを起動し、1 秒ごとにスケジューラにタイムアウトチェックを依頼します。タイムアウトしたタスクは状態を未開始に戻し、次回の worker タスク要求時に再スケジューリングされます。
func (c *Coordinator) timeoutHandler(msg TimeoutMsg) {
now := time.Now()
for taskId, start := range c.taskTimeOut {
if now.Sub(start).Seconds() > 10 {
for _, task := range c.tasks {
if taskId == task.TaskId {
if task.TaskStatus != TaskStatus_Finished {
task.TaskStatus = TaskStatus_Idle
}
break
}
}
delete(c.taskTimeOut, taskId)
break
}
}
msg.ok <- struct{}{}
return
}
最後に、完了状態のチェックはメインスレッドが Coordinator.Done を呼び、スケジューラに allDone フラグを確認するだけです。
worker の実装
worker は単一ゴルーチンで master からタスクを取得し、ループで実行します:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
resp := callGetTask()
switch resp.TaskType {
case TaskType_Map:
handleMapTask(resp.Task, mapf)
case TaskType_Reduce:
handleReduceTask(resp.Task, reducef)
case TaskType_Wait:
time.Sleep(time.Second)
case TaskType_Exit:
return
}
}
}
map と reduce の処理は直列単一スレッド版の実装を参考にできます。注意点として、複数プロセスが同一タスクを同時に実行したり、途中でクラッシュした場合に残されたファイルが再実行時に問題を引き起こすことがあります。したがって出力ファイルは、ioutil.TempFile
関数で一時ファイルを作成し、書き込み完了後に os.Rename
で目的のファイル名にリネームする方法を用いると、最終出力ファイルが必ず完全な状態で保存されることが保証されます。