實現MapReduce


簡介

當我們要統計數億文本的詞頻,單個機器性能一般,況且是數億級數據,處理是十分緩慢的,對於這樣的任務,希望的是多台電腦共同處理,大幅度減少任務時間。聯合多台電腦一起工作的系統就是分布式系統。

最近在學MIT6.824分布式系統課程,第一個Lab就是MapReduce,MapReduce是Google公司2004年發表的一篇論文,介紹很多任務都可以分為兩步操作——Map和Reduce(比如要統計詞頻,Map分別對每個文件生成單詞和單一數目,分不同區塊保存,Reduce對不同區塊進行統計,得到最終結果),可以將這兩個操作之外的包裝起來,只提供Map和Reduce的接口,不同任務只需要替換不同的Map函數和Reduce函數即可。論文中還講述了MapReduce分布式系統的實現細節以及應用場景。本文將以MIT6.824課程的Lab1為例,講述如何完成MapReduce Lab1以及實現過程中遇到一些的困難。

需要的基礎:

  • Go語言基礎 (推薦官網的tour)
  • MIT6.824前兩節的課程(B站鏈接
  • 讀MapReduce(主要看實現那一塊)

筆記中的全部代碼可以在https://github.com/FangYang970206/MIT6.824-2020中查看下載。

環境配置

環境配置可以看Lab1流程,手把手地教怎么配置,主要分兩步:

第一步安裝Go環境

wget -qO- https://dl.google.com/go/go1.13.6.linux-amd64.tar.gz | sudo tar xz -C /usr/local

第二步克隆Lab1倉庫

git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

克隆出來的倉庫的src文件夾中只有幾個是與MapReduce相關的,分別是:

  • main目錄下的mrmaster.go, mrworker.go, mrsequential.go和test-mr.sh,還有pg*.txt代表的8個文件是要分布式處理的輸入,這個文件內容也不需要變,test-mr.sh有五個任務,運行test-mr.sh可以知道自己是否通過所有任務。
  • mr文件夾,這個是MapReduce主要實現代碼,工作量就在這了
  • mrapps是不同任務的Map和Reduce函數包,這個不需要管

系統框架一覽

MapReduce系統是由一個master進程和多個worker進程組成,master和worker之間是通過RPC(Remote Procedure Call)進行通信,master進程負責給多個worker分配任務,記錄任務完成狀態,並且需要處理worker奔潰或者超時運行等問題,worker需要處理相應的任務,處理完畢發送報告給master,再請求下一個任務。我根據代碼函數調用邏輯畫出了一個系統框圖,可以更好的理解MapReduce系統的工作原理:

代碼詳解

根據上面的系統框圖,現在來從代碼中理解系統。

Master結構

type Flag struct {
	processing bool
	finished   bool
}

type Master struct {
	FileNames      []string
	MapFlags       []Flag
	ReduceFlags    []Flag
	MapTaskCnts    []int
	ReduceTaskCnts []int
	MapAllDone     bool
	ReduceALLDone  bool
	MapNum         int
	ReduceNum      int
	Mut            sync.Mutex
}
  • FileNames:pg*.txt這八個文件名
  • MapFlags:對應八個文件的Map任務狀態,processing代表正在處理,finished表示已完成
  • ReduceFlag:同上
  • MapTaskCnts:這是記錄Map的當前任務序列號,如果某個map任務發生timeout,HandleTimeout函數對這個map任務的processing標志清0,重新分配,當前任務的序列號在上一個任務號中加1,如果之前發生timeout的任務來報告完成,由於小於當前任務號,HandleWorkerReport函數可無需記錄,直接退出
  • ReduceTaskCnts:同上
  • MapAllDone:Map任務全部完成為true
  • ReduceAllDone:Reduce任務全部完成為true
  • MapNum:Map任務數
  • ReduceNum:Reduce任務數
  • Mut:互斥鎖,由於有多個worker,避免條件競爭發生不確定行為,master內部數據需要互斥訪問

Worker結構

type TaskState int

const (
	MapState    TaskState = 0
	ReduceState TaskState = 1
	StopState   TaskState = 2
	WaitState   TaskState = 3
)

type WorkerTask struct {
	MapID          int
	ReduceID       int
	ReduceNum      int
	MapNum         int
	MapTaskCnt     int
	ReduceTaskCnt  int
	State          TaskState
	FileName       string
	MapFunction    func(string, string) []KeyValue
	ReduceFunction func(string, []string) string
}
  • MapID和ReduceID:Map任務ID和Reduce任務ID
  • MapNum和ReduceNum:Map的任務總數和Reduce任務總數
  • MapTaskCnt和ReduceTaskCnt:Map任務序列號和Reduce序列號
  • State:任務有四種狀態,分別是MapState,ReduceState,StopState和WaitState,MapState表示當前需要處理Map任務,ReduceState表示當前需要處理Reduce任務,WaitState表示當前沒有需要處理的任務,開始睡眠等待,StopState代表任務已全部完成,可以退出。
  • FileName:表示Map任務需要的文件名
  • MapFunction和ReduceFunction:任務根據State需要進行的Map函數或者Reduce函數

Master接口

創建Master

func MakeMaster(files []string, nReduce int) *Master {
	m := Master{FileNames: files,
		MapFlags:       make([]Flag, len(files), len(files)),
		ReduceFlags:    make([]Flag, nReduce, nReduce),
		MapNum:         len(files),
		ReduceNum:      nReduce,
		MapAllDone:     false,
		ReduceALLDone:  false,
		MapTaskCnts:    make([]int, len(files)),
		ReduceTaskCnts: make([]int, nReduce),
	}
	m.server()
	args, reply := NoArgs{}, NoReply{}
	go m.HandleTimeOut(&args, &reply)
	return &m
}

這個函數會由mrmaster.go文件的主函數調用,創建一個master對象,需要傳入文件名數組,以及要進行多少個Reduce任務,根據這兩個輸入,可以初始化master參數。m.server()是關於RPC的內容,這里不去談,有興趣可以看看博客最后關於RPC內容, 只需要知道master函數要使用RPC,函數需要是兩個參數(沒參數會有警告),都為指針形式,第一個表示輸入參數,第二個表示輸出參數,返回錯誤,無錯誤返回nil。然后創建一個線程專門處理timeout,然后將master返還給mrmaster的主函數,mrmaster主函數會確認master的MapAllDone和ReduceALLDone是否都為真,都為真則退出,否則睡眠一段時間再確認。

生成worker task

func (m *Master) CreateWorkerTask(args *NoArgs, workerTask *WorkerTask) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if !m.MapAllDone {
		for idx := 0; idx < m.MapNum; idx++ {
			if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished {
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.State = MapState
				workerTask.MapID = idx
				workerTask.FileName = m.FileNames[idx]
				m.MapTaskCnts[idx]++
				workerTask.MapTaskCnt = m.MapTaskCnts[idx]
				m.MapFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	if !m.ReduceALLDone {
		for idx := 0; idx < m.ReduceNum; idx++ {
			if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
				workerTask.State = ReduceState
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.ReduceID = idx
				m.ReduceTaskCnts[idx]++
				workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
				m.ReduceFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	workerTask.State = StopState
	return nil
}

函數首先會獲得互斥鎖,然后判斷MapAllDone是否為false,為false進入循環遍歷,如果某個任務的processing狀態和finished狀態都為false,說明這個任務可以需要被處理,可以分配,講配置參數寫入到輸出參數中,並標志master中當前任務的狀態processing為true以及序列號。如果沒有任務需要處理,說明map有些任務正在處理,有些已完成。進入等待階段。判斷ReduceALLDone與前面類似。不加以敘述。

處理worker report

func (m *Master) HandleWorkerReport(wr *WorkerReportArgs, task *NoReply) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if wr.IsSuccess {
		if wr.State == MapState {
			if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] {
				m.MapFlags[wr.MapID].finished = true
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] {
				m.ReduceFlags[wr.ReduceID].finished = true
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	} else {
		if wr.State == MapState {
			if m.MapFlags[wr.MapID].finished == false {
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if m.ReduceFlags[wr.ReduceID].finished == false {
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	}
	for id := 0; id < m.MapNum; id++ {
		if !m.MapFlags[id].finished {
			break
		} else {
			if id == m.MapNum-1 {
				m.MapAllDone = true
			}
		}
	}
	for id := 0; id < m.ReduceNum; id++ {
		if !m.ReduceFlags[id].finished {
			break
		} else {
			if id == m.ReduceNum-1 {
				m.ReduceALLDone = true
			}
		}
	}
	return nil
}

輸入參數有一個標識位,表示任務是否成功,成功判斷任務狀態以及序列號,如果序列號與master對應上,可以表明這個任務成功,如果對不上,說明這是個timeout任務,無需處理。如果任務標志位為false,進入錯誤處理,判斷任務是否完成,因為可能是timeout任務標志位為false,未完成讓processing置0,CreateWorkerTask可以重新分配。最后判斷Map任務和Reduce任務是否相應全部完成,全部完成可以設置MapALLDone和ReduceALLDone為true。

處理timeout

func (m *Master) HandleTimeOut(args *NoArgs, reply *NoReply) error {
	for {
		m.Mut.Lock()
		if m.MapAllDone && m.ReduceALLDone {
			m.Mut.Unlock()
			break
		}
		time.Sleep(30 * time.Millisecond)
		if !m.MapAllDone {
			for idx := 0; idx < m.MapNum; idx++ {
				if m.MapFlags[idx].finished == false {
					m.MapFlags[idx].processing = false
				}
			}
		} else {
			for idx := 0; idx < m.ReduceNum; idx++ {
				if m.ReduceFlags[idx].finished == false {
					m.ReduceFlags[idx].processing = false
				}
			}
		}
		m.Mut.Unlock()
		time.Sleep(2000 * time.Millisecond)
	}
	return nil
}

處理timeout很簡單,先判斷MapALLDone和ReduceALLDone是否都為true,都為true則退出即可。然后判斷M任務那些還沒有完成,對沒有完成的任務的processing清0,就可以讓CreateWorkerTask重新分配沒有完成的任務了。最后釋放鎖,睡眠2s,可以看到Handletimeout函數是以2s為間隔的,2s內沒有完成的任務視為timeout。

Worker接口

生成worker

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	wt := WorkerTask{
		MapFunction:    mapf,
		ReduceFunction: reducef,
	}
	for {
		wt.GetWorkerTask()
		if wt.State == MapState {
			wt.DoMapWork()
		} else if wt.State == ReduceState {
			wt.DoReduceWork()
		} else if wt.State == StopState {
			break
		} else if wt.State == WaitState {
			time.Sleep(300 * time.Millisecond)
		}
	}
	return
}

func (wt *WorkerTask) GetWorkerTask() {
	cwa := NoArgs{}
	newWt := WorkerTask{}
	call("Master.CreateWorkerTask", &cwa, &newWt)
	if newWt.State == MapState {
		wt.ReduceNum = newWt.ReduceNum
		wt.MapNum = newWt.MapNum
		wt.State = newWt.State
		wt.MapID = newWt.MapID
		wt.FileName = newWt.FileName
		wt.MapTaskCnt = newWt.MapTaskCnt
	} else if newWt.State == ReduceState {
		wt.State = newWt.State
		wt.ReduceID = newWt.ReduceID
		wt.ReduceTaskCnt = newWt.ReduceTaskCnt
		wt.MapNum = newWt.MapNum
		wt.ReduceNum = newWt.ReduceNum
	} else if newWt.State == StopState {
		wt.State = newWt.State
	} else {
		wt.State = newWt.State
	}
}

mrworker會調用worker函數,傳入map函數和reduce函數,根據函數參數創建一個worker,然后進入循環,調用GetWorkerTask函數,這個函數會調用Master.CreateWorkerTask函數,並傳入兩個參數,得到任務分配后,講相應的參數和狀態賦值給worker。worker就可以根據狀態進入處理相應任務或者睡眠,或者退出。

Map work

func (wt *WorkerTask) DoMapWork() {
	file, err := os.Open(wt.FileName)
	content, err := ioutil.ReadAll(file)
	file.Close()
	kvs := wt.MapFunction(wt.FileName, string(content))
	intermediate := make([][]KeyValue, wt.ReduceNum, wt.ReduceNum)
	for _, kv := range kvs {
		idx := ihash(kv.Key) % wt.ReduceNum
		intermediate[idx] = append(intermediate[idx], kv)
	}
	for idx := 0; idx < wt.ReduceNum; idx++ {
		intermediateFileName := fmt.Sprintf("mr-%d-%d", wt.MapID, idx)
		file, err = os.Create(intermediateFileName)
		data, _ := json.Marshal(intermediate[idx])
		_, err = file.Write(data)
		file.Close()
	}
	wt.ReportWorkerTask(nil)
}

func (wt *WorkerTask) ReportWorkerTask(err error) {
	wra := WorkerReportArgs{
		MapID:     wt.MapID,
		ReduceID:  wt.ReduceID,
		State:     wt.State,
		IsSuccess: true,
	}
	if wt.State == MapState {
		wra.MapTaskCnt = wt.MapTaskCnt
	} else {
		wra.ReduceTaskCnt = wt.ReduceTaskCnt
	}
	wrr := NoReply{}
	if err != nil {
		wra.IsSuccess = false
	}
	call("Master.HandleWorkerReport", &wra, &wrr)
}

為了增加可讀性,我將處理錯誤的代碼刪除了,更好看一些,Map work就是讀取相應的文件,調用MapFunction生成KeyValue對,然后根據哈希函數得到要講當前key分配到哪一塊中,總共有ReduceNum塊,最后根據這么塊生成對應map以及reduce塊的文件。然后調用ReportWorkerTask報告成功,傳入nil表示成功。ReportWorkerTask內部會調用Master.HandleWorkerReport函數來匯報這一執行結果。

Reduce work

func (wt *WorkerTask) DoReduceWork() {
	kvsReduce := make(map[string][]string)
	for idx := 0; idx < wt.MapNum; idx++ {
		filename := fmt.Sprintf("mr-%d-%d", idx, wt.ReduceID)
		file, err := os.Open(filename)
		content, err := ioutil.ReadAll(file)
		file.Close()
		kvs := make([]KeyValue, 0)
		err = json.Unmarshal(content, &kvs)
		for _, kv := range kvs {
			_, ok := kvsReduce[kv.Key]
			if !ok {
				kvsReduce[kv.Key] = make([]string, 0)
			}
			kvsReduce[kv.Key] = append(kvsReduce[kv.Key], kv.Value)
		}
	}
	ReduceResult := make([]string, 0)
	for key, val := range kvsReduce {
		ReduceResult = append(ReduceResult, fmt.Sprintf("%v %v\n", key, wt.ReduceFunction(key, val)))
	}
	outFileName := fmt.Sprintf("mr-out-%d", wt.ReduceID)
	err := ioutil.WriteFile(outFileName, []byte(strings.Join(ReduceResult, "")), 0644)
	wt.ReportWorkerTask(nil)
}

同樣把一些錯誤處理刪除了,首先讀取相同塊的所有文件,需要對相同key的內容聚合在一起,然后循環調用ReduceFunction得到reduce的結果,最后生成輸出。

遇到過的坑

主要遇到的兩個坑,一個是關於GetWorkerTask,一個是CreateWorkerTask

首先說GetWorkerTask,最開始代碼是下面這樣子,我把wt作為參數傳入進去,我發現后期調用的時候,wt的參數是不會更新的,一直處於WaitState,導致任務worker無法工作。新創建一個WorkerTask為參數,傳入即可解決問題。

func (wt *WorkerTask) GetWorkerTask() {
	cwa := NoArgs{}
	call("Master.CreateWorkerTask", &cwa, wt)
}

第二個是思維還沒有轉變過來的問題,分布式系統需要有分布式的思想,這是CreateWorkerTask的截取代碼,可以看到少了兩行,沒有對MapNum和ReduceNum進行初始化,為什么會做不初始化呢,因為當時我想的是上面的Map任務已經初始化,沒有必要再進行初始化,這就是錯誤的根源,萬一之前初始化的worker crash掉了,map任務全部完成,那新的worker進入reduce,你不初始化MapNum和ReduceNum就會有bug,最明顯的你運行CrashTest任務時,發現最后生成的結果有的有,有的沒有,有的是之前運行Map任務的,現在運行Reduce任務,沒有的就是新的worker直接進入Reduce任務,默認初始化為0,則循環讀文件直接退出。

if !m.ReduceALLDone {
		for idx := 0; idx < m.ReduceNum; idx++ {
			if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
				workerTask.State = ReduceState
				workerTask.ReduceID = idx
				m.ReduceTaskCnts[idx]++
				workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
				m.ReduceFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}

RPC

Go語言進行RPC非常簡單,有現成的RPC的包,非常方便。

func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	go http.Serve(l, nil)
}

func call(rpcname string, args interface{}, reply interface{}) bool {
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	defer c.Close()
	c.Call(rpcname, args, reply)
}

刪減了一些錯誤處理代碼,核心代碼就是上面這些,只需要20來行就可以構建好RPC流程,首先master要調用server函數,進行rpc注冊以及rpc處理http,然后得到套接字名,移除系統中套接字名,然后開始監聽,創建線程進行http服務。server函數運行好之后。worker就可以根據套接字名進行撥號,然后調用master的函數。

結語

MapReduce介紹就到這了,推薦自己嘗試實現一遍,收獲還是很大的,包括mapreduce細節實現,更加熟悉Go,分布式調試(可以看看這個commit下的代碼,沒有刪減打印,可以清楚看輸出,特別是Crashtest,可以將test-mr.sh前四個任務注釋掉,看CrashTest輸出)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM