Lecture 02 Infrastructure: RPC & threads
一、多線程挑戰
- 共享數據: 使用互斥信號量、或者避免共享
- 線程間協作: 使用channels 或者 waitgroup 來等待所有map線程結束
- 並發粒度:
- 粗粒度: 簡單,但是並發性不高
- 細粒度: 更多的並發,但是處理復雜,可能會有更多的沖突和死鎖
以下這段代碼就能說明並發的粒度問題:
constructTaskArgs := func(phase jobPhase, task int) DoTaskArgs {
debug("task: %d\n", task)
var taskArgs DoTaskArgs
taskArgs.Phase = phase
taskArgs.JobName = jobName
taskArgs.NumOtherPhase = n_other
taskArgs.TaskNumber = task
if phase == mapPhase {
taskArgs.File = mapFiles[task]
}
return taskArgs
}
tasks := make(chan int) // act as task queue
go func() {
for i := 0; i < ntasks; i++ {
tasks <- i
}
}()
successTasks := 0
success := make(chan int)
loop:
for {
select {
case task := <-tasks:
go func() {
worker := <-registerChan
status := call(worker, "Worker.DoTask", constructTaskArgs(phase, task), nil)
if status {
success <- 1
go func() { registerChan <- worker }()
} else {
tasks <- task
}
}()
case <-success:
successTasks += 1
default:
if successTasks == ntasks {
break loop
}
}
}
里面不僅使用了task的channel, 還使用了success (channel) 來控制 successTask 的共享。
二、爬蟲並發的問題
網絡是一個有環的圖,但是我們設計爬蟲需要避免環。
- 一方面是因為重復遍歷url,沒有任何意義
- 另一方面只訪問一次url可以減輕目標服務器負擔
單線程爬蟲:
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}
2.1 並發互斥爬蟲
因此需要維護一張visited表來記錄是否遍歷過url,這里就會出現並發問題。
當T1 檢查visited[url] , T2也檢查visited[url] 兩個線程都會認為沒有訪問過該url,這時候就會發生沖突,發生WW(write + write) 。解決辦法是,維護一個Mutex 互斥信號量來訪問visited這張表。
- 判斷線程結束
使用sync.WaitGroup來保證線程執行完成
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
if f.fetched[url] {
f.mu.Unlock()
return
}
f.fetched[url] = true
f.mu.Unlock()
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u)
}
done.Wait()
return
}
func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}
2.2 並發通道爬蟲
master啟動worker去爬取url, worker將url送到同一個通道里面, master從通道獲取url去爬取內容
共享的數據:
- 通道
- 發送到 通道的 slices 和 字符串
- 從master發送到worker的參數
//
// Concurrent crawler with channels
//
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func master(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
master(ch, fetcher)
}
三、什么時候使用共享空間和鎖 vs 通道
state -- 共享空間和鎖
communication -- 通道
waiting for events -- 通道
使用go 的 race dector
四、Remote Procedure Call(RPC)
4.1 軟件架構:
客戶端 handlers
stubs dispatcher(調度器)
rpc lib rpc lib
網絡 ----- 網絡
4.2 rpc過程:
- 首先雙方定義發送的參數, 和返回的結構體
- 客戶端 Dial()創建tcp連接請求 call() 調用rpc庫來執行遠程調用
- 服務器 聲明一個帶返回方法的對象 作為rpc處理器, 然后使用rpc庫的Register函數來注冊服務, rpc庫:
- 讀取每一個請求
- 為每一個請求創建一個goroutine
- 反序列化請求
- 調用目標函數
- 序列化返回值
- 將返回值通過tcp連接返回
4.3rpc 示例
client:
//
// Client
//
func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}
func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}
func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}
server
//
// Server
//
type KV struct {
mu sync.Mutex
data map[string]string
}
func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}
func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
val, ok := kv.data[args.Key]
if ok {
reply.Err = OK
reply.Value = val
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
return nil
}
func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.data[args.Key] = args.Value
reply.Err = OK
return nil
}
4.3 rpc怎么處理失敗
問題:
- 網絡延遲
- 丟包
- 服務器慢或者崩潰
處理辦法:
- best effort:
- client調用call( ) 等待響應, 如果過了一會沒收到響應那就再發送一個call( )
- 這個過程重復幾次,然后放棄並且返回一個錯誤
- at most once:
- 針對服務端說的:當服務端收到相同的請求時
- 根據xid(client id 判斷)如果收到相同請求 返回之前的處理結果
- xid 怎么保證唯一性
- 針對服務端說的:當服務端收到相同的請求時
- exactly once:
- 無限重試
- 冗余檢查
- 容錯服務