【MIT 6.824 】分布式系統 課程筆記(一)


Lecture 02 Infrastructure: RPC & threads

一、多線程挑戰

  • 共享數據: 使用互斥信號量、或者避免共享
  • 線程間協作: 使用channels 或者 waitgroup 來等待所有map線程結束
  • 並發粒度:
    • 粗粒度: 簡單,但是並發性不高
    • 細粒度: 更多的並發,但是處理復雜,可能會有更多的沖突和死鎖

以下這段代碼就能說明並發的粒度問題:

	constructTaskArgs := func(phase jobPhase, task int) DoTaskArgs {
		debug("task: %d\n", task)
		var taskArgs DoTaskArgs
		taskArgs.Phase = phase
		taskArgs.JobName = jobName
		taskArgs.NumOtherPhase = n_other
		taskArgs.TaskNumber = task
		if phase == mapPhase {
			taskArgs.File = mapFiles[task]
		}
		return taskArgs
	}

	tasks := make(chan int) // act as task queue
	go func() {
		for i := 0; i < ntasks; i++ {
			tasks <- i
		}
	}()
	successTasks := 0
	success := make(chan int)

loop:
	for {
		select {
		case task := <-tasks:
			go func() {
				worker := <-registerChan
				status := call(worker, "Worker.DoTask", constructTaskArgs(phase, task), nil)
				if status {
					success <- 1
					go func() { registerChan <- worker }()
				} else {
					tasks <- task
				}
			}()
		case <-success:
			successTasks += 1
		default:
			if successTasks == ntasks {
				break loop
			}
		}
	}

里面不僅使用了task的channel, 還使用了success (channel) 來控制 successTask 的共享。

二、爬蟲並發的問題

網絡是一個有環的圖,但是我們設計爬蟲需要避免環。

  • 一方面是因為重復遍歷url,沒有任何意義
  • 另一方面只訪問一次url可以減輕目標服務器負擔

單線程爬蟲:

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
	}
	return
}

2.1 並發互斥爬蟲

因此需要維護一張visited表來記錄是否遍歷過url,這里就會出現並發問題。

當T1 檢查visited[url] , T2也檢查visited[url] 兩個線程都會認為沒有訪問過該url,這時候就會發生沖突,發生WW(write + write) 。解決辦法是,維護一個Mutex 互斥信號量來訪問visited這張表。

  • 判斷線程結束

使用sync.WaitGroup來保證線程執行完成

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
	f.mu.Lock()
	if f.fetched[url] {
		f.mu.Unlock()
		return
	}
	f.fetched[url] = true
	f.mu.Unlock()

	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
	for _, u := range urls {
		done.Add(1)
		go func(u string) {
			defer done.Done()
			ConcurrentMutex(u, fetcher, f)
		}(u)
	}
	done.Wait()
	return
}

func makeState() *fetchState {
	f := &fetchState{}
	f.fetched = make(map[string]bool)
	return f
}

2.2 並發通道爬蟲

master啟動worker去爬取url, worker將url送到同一個通道里面, master從通道獲取url去爬取內容

共享的數據:

  • 通道
  • 發送到 通道的 slices 和 字符串
  • 從master發送到worker的參數
//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func master(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	master(ch, fetcher)
}

三、什么時候使用共享空間和鎖 vs 通道

state -- 共享空間和鎖

communication -- 通道

waiting for events -- 通道

使用go 的 race dector

四、Remote Procedure Call(RPC)

4.1 軟件架構:

客戶端 handlers

stubs dispatcher(調度器)

rpc lib rpc lib


網絡 ----- 網絡

4.2 rpc過程:

  • 首先雙方定義發送的參數, 和返回的結構體
  • 客戶端 Dial()創建tcp連接請求 call() 調用rpc庫來執行遠程調用
  • 服務器 聲明一個帶返回方法的對象 作為rpc處理器, 然后使用rpc庫的Register函數來注冊服務, rpc庫:
    • 讀取每一個請求
    • 為每一個請求創建一個goroutine
    • 反序列化請求
    • 調用目標函數
    • 序列化返回值
    • 將返回值通過tcp連接返回

4.3rpc 示例

源碼

client:

//
// Client
//

func connect() *rpc.Client {
	client, err := rpc.Dial("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}
	return client
}

func get(key string) string {
	client := connect()
	args := GetArgs{"subject"}
	reply := GetReply{}
	err := client.Call("KV.Get", &args, &reply)
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()
	return reply.Value
}

func put(key string, val string) {
	client := connect()
	args := PutArgs{"subject", "6.824"}
	reply := PutReply{}
	err := client.Call("KV.Put", &args, &reply)
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()
}

server

//
// Server
//

type KV struct {
	mu   sync.Mutex
	data map[string]string
}

func server() {
	kv := new(KV)
	kv.data = map[string]string{}
	rpcs := rpc.NewServer()
	rpcs.Register(kv)
	l, e := net.Listen("tcp", ":1234")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go func() {
		for {
			conn, err := l.Accept()
			if err == nil {
				go rpcs.ServeConn(conn)
			} else {
				break
			}
		}
		l.Close()
	}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	val, ok := kv.data[args.Key]
	if ok {
		reply.Err = OK
		reply.Value = val
	} else {
		reply.Err = ErrNoKey
		reply.Value = ""
	}
	return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	kv.data[args.Key] = args.Value
	reply.Err = OK
	return nil
}

4.3 rpc怎么處理失敗

問題:

  • 網絡延遲
  • 丟包
  • 服務器慢或者崩潰

處理辦法:

  • best effort:
    • client調用call( ) 等待響應, 如果過了一會沒收到響應那就再發送一個call( )
    • 這個過程重復幾次,然后放棄並且返回一個錯誤
  • at most once:
    • 針對服務端說的:當服務端收到相同的請求時
      • 根據xid(client id 判斷)如果收到相同請求 返回之前的處理結果
      • xid 怎么保證唯一性
  • exactly once:
    • 無限重試
    • 冗余檢查
    • 容錯服務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM