簡介
這個實驗要求我們使用 golang 自行實現一個支持並發和簡單容錯的 分布式 MR ,支持一個 master 多個 wokrer 的工作模式。
我做完實驗之后去看了一下之前網上發布的一些 lab1 的作業。感覺今年是不是改版了?后來我去看了一下 2018 年的 lab1 作業,果然改版了。。。。。。不過本身也沒打算參考網上的作業。就在這篇里面記錄一些關鍵點吧。
因為我本人並不會 golang,是臨時學習了一下。所以整個實驗搞了 3個晚上才通過所有測試。
過程
實驗提供了一些現成的文件,比如 mr 的 application 都是現成的我們可以通過 go build -buildmode=plugin 把他們編譯成動態鏈接庫直接調用他們用以測試我們的 mapreduce,我們只需要重點修改三個文件 master | worker | rpc 里面的內容即可。代碼里面也提供了現成的 master 啟動入口和 woker 啟動入口。
下面我就挑我覺得有必要說一下的地方談一下,不遍歷整個 lab1 的過程了。
首先要實現一個在 master 中的 MakeMaster 的函數,這個 MakeMaster 相當於初始化我們的 master 對象上的值參數,做一些准備。
master 結構體定義
const ( INIT = 0 PROCESS = 1 FINISHED = 2 ) type Master struct { // Your definitions here. Contents map[string]string // MAP task runner control FinishedStateMap map[string]int // REDUCE task runner control FinishedStateReduce map[int]int // FilenameList filenames []string // FilenameOriginMap filenamesOrigin map[string]string IsMapFinished bool IsReduceFinished bool ReduceTaskCounter int CoarsnessMapFailControl int CoarsnessReduceFailControl int MM sync.Mutex }
初始化 master 並開啟 master 服務器監聽
import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/rpc"
"os"
"path"
"strconv"
"sync"
)
func MakeMaster(files []string, nReduce int) *Master { m := Master{ Contents: make(map[string]string), FinishedStateMap: make(map[string]int), FinishedStateReduce: make(map[int]int), IsMapFinished: false, IsReduceFinished: false, filenames: []string{}, filenamesOrigin: make(map[string]string), CoarsnessMapFailControl: 0, CoarsnessReduceFailControl: 0, ReduceTaskCounter: nReduce, MM: sync.Mutex{}, } // init map state for _, filename := range files { baseFilename := path.Base(filename) file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } m.Contents[baseFilename] = string(content) m.FinishedStateMap[baseFilename] = INIT m.filenames = append(m.filenames, baseFilename) m.filenamesOrigin[baseFilename] = filename m.CoarsnessMapFailControl = 0 m.CoarsnessReduceFailControl = 0 file.Close() } // init reduce state maxR := m.ReduceTaskCounter i := 0 for ; i < maxR; i++ { m.FinishedStateReduce[i] = INIT } // print now have something fmt.Printf("Master start listen\n") m.server() return &m }
我分別說下這里 master 結構體里面定義參數以及我的想法。
Contens: key 是文件名,value 是文本。這里主要是建立一個 filename 和 文本文件的映射關系,方便需要的時候對其進行內容的遍歷。因為 master 涉及到下發工作,所以當 woker 來向 master 申請任務的時候我們需要將這些內容下發給 woker 進行處理。
FinishedStateMap: key 是 map 任務的任務名(這里我使用的文件名做 map 任務的任務名),value 是三個狀態 INIT PROCESS FINISHED。這個狀態控制器用於控制 map 任務的狀態情況監控。
FinishedStateReduce: key 是 reduce 任務的任務名(這里因為后面輸出文件測試要求的關系,直接使用了 reduce 任務的編號 0 1 2 3 .....),value 是三個狀態 INIT PROCESS FINISHED。這個狀態控制器用於控制 reduce 任務的狀態情況監控。
Filenames: 用於存儲所有被訪問的 input 文件名稱。因為使用了文件名稱作狀態的 key 所以得維護這么一個東西,其實想想應該給 Map 任務搞個編號的。
FilenamesOrigin: 用於存儲所有被訪問的 input 文件名稱的 basefile 名稱。因為測試文件里面都是使用的相對路徑訪問的文件,文件名稱里面都有 ../../ 這種東西,而且不能修改測試文件和原文件所以就搞了一個這個方便映射。
IsMapFinished: 標記是否所有 Map 任務都已經完成。MR 中必須要等所有 Map 任務都完成了才可以開始 Reduce 任務。
IsReduceFinished: 標記是否所有 Reduce 任務都已經完成。 MR 中必須要等所有的 Reduce 任務都完成了才會結束。
ReduceTaskCounter: 標記一共需要幾個 Reduce 啟動,一共輸出幾個文件。因為調用 MakeMaster 函數的地方有傳入最后使用幾個 Reducer 輸出文件,所以這里存一下。
CoarsnessMapFailControl: 粗粒度 Map 控制器,因為測試有容錯需求。所以這里用於粗略控制一下超時。具體實現下面我會說到。
CoarsnessReduceFailControl: 粗粒度 Reduce 控制器,因為測試有容錯需求。所以這里用於粗略控制一下超時。具體實現下面我會說到。
MM: 一把排它鎖。
所以整個設計的整體思想都圍繞着這一張圖

剛才做的工作就是啟動 Master 讓后把剛服務需要用到的數據都准備好,然后開始監聽 woker 服務。
import "../mr" import "plugin" import "os" import "fmt" import "log" func main() { if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n") os.Exit(1) } mapf, reducef := loadPlugin(os.Args[1]) mr.Worker(mapf, reducef) }
啟動 woker 服務,並解析動態鏈接庫傳進來的 mapf reducef 傳遞給 woker
worker.go file
import ( "encoding/json" "fmt" "hash/fnv" "io/ioutil" "log" "net/rpc" "os" "strconv" "time" ) const PREFIX = "mr-" const MRPREFIX = "mr-out-" // // Map functions return a slice of KeyValue. // type KeyValue struct { Key string Value string } // // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. // func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. for { flag := RunTaskCall(mapf, reducef) time.Sleep(1 * time.Second) if flag == 0 { break } } // uncomment to send the Example RPC to the master. // CallExample() } func RunTaskCall(mapf func(string, string) []KeyValue, reducef func(string, []string)string) int { args := RequestJobArgs{} reply := ReplyJobArgs{} rAckargs := RequestAckArgs{} replayAckargs := ReplyAckArgs{} // ask for a task call("Master.AskForJob", &args, &reply) // if task finished then end this program if reply.JobType == "finished" { fmt.Println("No tasks worker exit") return 0 } // get a map task if reply.JobType == "map" { fmt.Println("Worker start a map task") return RunMapTask(args, reply, rAckargs, replayAckargs, mapf) // get a reduce task } else if reply.JobType == "reduce" { fmt.Println("Woker start reduce task") return RunReduceTask(args, reply, rAckargs, replayAckargs, reducef) } return 1 } func RunMapTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, mapf func(string, string) []KeyValue) int { fileMap := []KeyValue{} filename := reply.Contents[0] filenameOriginMap := reply.ExtraInfo["filenamesOriginMap"] values := reply.Contents[1] target := mapf(filenameOriginMap[filename], values) f, _ := os.OpenFile(PREFIX+filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) defer f.Close() // write json intermediate results into file for _, v := range target { fileMap = append(fileMap, KeyValue{v.Key, v.Value}) } // open file and write file after use buffer _fileMap, err := json.Marshal(fileMap) if err != nil { log.Fatal("%v", err)} _, err = f.Write(_fileMap) if err != nil { log.Fatalf("%v", err)} f.Close() fmt.Println("完成一次 Map") rAckargs.JobType = reply.JobType rAckargs.Filename = filename call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished map task %v\n\n", filename) return 1 } func RunReduceTask(args RequestJobArgs, reply ReplyJobArgs, rAckargs RequestAckArgs, replayAckargs ReplyAckArgs, reducef func(string, []string)string) int { _results := make(map[string][]string) filenames := reply.Contents reduceKey := reply.ReduceKey ReduceTaskCounter := reply.ReduceTaskCounter outputFile, _ := os.OpenFile(MRPREFIX + strconv.Itoa(reduceKey), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) defer outputFile.Close() // iter all files match reduce no for _, v := range filenames { f, err := os.Open(PREFIX+v) if err != nil { log.Fatalf("cant open file: %v \n", err) } target := []KeyValue{} contents, err := ioutil.ReadAll(f) err = json.Unmarshal(contents, &target) if err != nil { log.Fatalf("json decode fail %v \n", err) } // combine same key values for _, v := range target { if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) } } } for k, v := range _results { fmt.Fprintf(outputFile, "%v %v\n", k, reducef(k, v)) } fmt.Println("Finished onece Reduce") rAckargs.JobType = reply.JobType rAckargs.Filename = strconv.Itoa(reduceKey) call("Master.ConfirmState", &rAckargs, &replayAckargs) fmt.Printf("Confirm finished reduce task %v\n\n", reduceKey) return 1 }
master file
func (m *Master) AskForJob(args *RequestJobArgs, reply *ReplyJobArgs) error { fmt.Println("Start AskForJob") m.MM.Lock() defer m.MM.Unlock() reply.ReduceTaskCounter = m.ReduceTaskCounter reply.ExtraInfo = make(map[string]map[string]string) reply.ExtraInfo["filenamesOriginMap"] = m.filenamesOrigin if m.IsReduceFinished == true { reply.Success = 1 reply.JobType = "finished" return nil } if m.IsMapFinished == false { m.CoarsnessMapFailControl++ // crash safe map reset state if m.CoarsnessMapFailControl >= 30 { for k, v := range m.FinishedStateMap { if v == PROCESS { m.FinishedStateMap[k] = INIT continue } } m.CoarsnessMapFailControl = 0 } fmt.Println("give a map task to worker") for k, v := range m.FinishedStateMap { if v == INIT { reply.Success = 1 reply.JobType = "map" reply.Contents = append(reply.Contents, k, m.Contents[k]) m.FinishedStateMap[k] = PROCESS fmt.Printf("assignment %v task key: %v \n", reply.JobType, k) break } } return nil } else if m.IsReduceFinished == false { // crash safe reduce reset state m.CoarsnessReduceFailControl++ if m.CoarsnessReduceFailControl >= 30 { for k, v := range m.FinishedStateReduce { if v == PROCESS { m.FinishedStateReduce[k] = INIT continue } } m.CoarsnessReduceFailControl = 0 } fmt.Println("give a reduce task to worker") for k, v := range m.FinishedStateReduce { if v == INIT { reply.Success = 1 reply.JobType = "reduce" reply.Contents = m.filenames reply.ReduceKey = k m.FinishedStateReduce[k] = PROCESS fmt.Printf("assignment %v reduce key: %v \n", reply.JobType, k) break } } } return nil } func (m *Master) ConfirmState(args *RequestAckArgs, reply *ReplyAckArgs) error { m.MM.Lock() defer m.MM.Unlock() filename := args.Filename if args.JobType == "map"{ m.FinishedStateMap[filename] = FINISHED for _, v := range m.FinishedStateMap { if v == INIT || v == PROCESS { return nil } continue } m.IsMapFinished = true } else if args.JobType == "reduce" { filename, _ := strconv.Atoi(filename) m.FinishedStateReduce[filename] = FINISHED for _, v := range m.FinishedStateReduce { if v == INIT || v == PROCESS { return nil } continue } m.IsReduceFinished = true } return nil }
代碼貼了不少,來說下整個 mr 的設計思路。
1. master 起來之后,worker 需要先去執行 map 任務。那么我們讓 woker 通過 rpc 去找 master 要任務來執行。
call("Master.AskForJob", &args, &reply)
這是申請任務的 rpc 不管是 Map 任務還是 Reduce 任務都是從這里申請。是發放 Map 任務還是 Reduce 任務由 master 來決策和控制。
2. 當 worker 拿到 master 下發的 Map 任務之后將輸入文件 key 對應的 FinishedStateMap 值置為 PROCESS 以保證后面過來申請任務的 woker 不會執行相同的任務。(這里設置時要使用排它鎖進行鎖定)
獲取任務中間還有一些使用粗粒度計數器來進行超時控制的判斷(這些都可以使用更細粒度的超時計算方法,這里只是單純實現一下類似的功能大家知道意思就行)。
3. 我們 worker 拿到被分配的任務之后調用對應 jobType 的 task runner 執行對應任務。
4. woker 將對應的 input 讀入,然后運行動態鏈接庫傳入的 map 函數進行執行,獲得 map 之后的結果,將結果 json 序列化之后寫入到本地存儲。
5. 反復執行步驟4 直到所有 map 任務被 ack 到 FINISHED 狀態。
6. 繼續申請任務 master 將開始分發 reduce 任務。
7. reduce 任務將通過 hash 分為 m.ReduceTaskCounter 數量的桶中,並且執行相同次數的 reduce 輸出。每個被丟進桶里的數據都是單個 reduce 任務拉取所有中間結果進行輸出和合並的(這個中間有個 shuffle 的步驟)。這里有個問題,為什么我們不在 map 任務的時候將結果進行分組到對應數量的 reduce 中。這樣 reduce 在讀取的時候就可以讀取單個文件進行統計排序了。其實也不是不行但是有個很坑的問題是,這樣的話每個被寫入的文件是由多個 map 任務進行執行。如果中間有任何一個 map 任務失敗進行重試了。就可能寫入重復的數據到那些文件里面去了。這樣 failover cover 起來就復雜了。但是如果我們讓 map 任務只是單純 map 完畢並將結果存到對應的文件中。我們就可以無限次使用冪等操作重復這個 map 操作而不用擔心數據混亂。
8. reduce 讀取所有中間結果匯總起來輸出跟自己桶對應的輸出文件。
9. 讀取這些結果匯總輸出最后的結果。
總結
感覺整個過程中還是要牢記論文中的幾個關鍵點:
1. 怎么去 shuffle 數據再交給 reduce 保證交給同一個 Reduce 的數據的冪等性。lab1 中提供給了我們一個 ihash 函數幫助我們對 key 進行 hash 然后根據 reducer 的數量來 shuffle。
func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key))
// 和 0x7fffffff 取 & 是為了處理負數情況。 return int(h.Sum32() & 0x7fffffff) }
if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) }
2. mapreduce 的本質是
map (k1,v1) → list(k2,v2) => shuffle => reduce (k2,list(v2)) → list(v2)
拿 wordcount 應用來說
K1, V1 是 filename,文章 => list[(word, 1), xxxxx,xx]
下面代碼 if 部分就是在 shuffle
if ihash(v.Key) % ReduceTaskCounter == reduceKey { _results[v.Key] = append(_results[v.Key], v.Value) }
計算出 (k2 => list(v2)) 的部分就是 if 內部的部分
這里的 k2 是 word, v2 是這個文件中的 key 對應出現次數的 list
k2, list(v2) 會是這種數據造型
wrongs:[1 1 1] wry:[1] yearly:[1] yell:[1 1 1 1 1 1 1 1 1]
最終通過 reduce 函數輸出 list(v2) reduce 函數的傳入值就是 k2, list(v2) 然后通過自己定義的 reduce 函數算出自己想要的結果。
wc 實現了一個這樣的函數 只是單純的將各 list(v2) 匯總了
func Reduce(key string, values []string) string { // return the number of occurrences of this word. return strconv.Itoa(len(values)) }
最后結果 list(v2) 應該長這樣 (3, 1, 1, 9 ) 但是我們可以將他打印結合 key 就變成了
wrongs 3
wry 1
這樣的我們想要的輸出了。
我們寫的這個 mr 程序要能完全匹配這一規則,才能做到接口提供好了所有 mrapp 都可以使用。因為 test-case 包含了好幾個 mr 的經典場景包括 wc wordcount | indexer 索引倒排索引統計。
以上
(下一個實驗就 raft 了有點小激動呢)
Refenrece:
http://nil.csail.mit.edu/6.824/2020/labs/lab-mr.html lab1 描述
http://nil.csail.mit.edu/6.824/2020/papers/mapreduce.pdf mapreduce 論文
https://blog.csdn.net/JasonDing1354/article/details/46882597 mapreduce shuffle 和 spark shuffle 機制介紹
