我讀《通過Go來處理每分鍾達百萬的數據請求》


我讀《通過Go來處理每分鍾達百萬的數據請求》

原文

原文作者為Malwarebytes公司的首席架構師Marcio Castilho http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

問題描述

當我們的服務端需要處理大量的耗時任務時,我們一般都會考慮將耗時任務異步處理。

簡單粗暴法

golang恰恰給我們的異步處理帶來了很大的便利--go func()。然而,絕大多數的時候,我們不能簡單粗暴的創建協程來處理異步任務,原因是不可控。雖然協程相對於線程占用的系統資源更少,但這並不代表我們可以無休止的創建協程。積水成江,不停創建協程也有壓垮系統的風險。這里引用原作者的demo,一個執行耗時任務的handler。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
             w.WriteHeader(http.StatusMethodNotAllowed)
             return
    }
    // Read the body into a string for json decoding
     var content = &PayloadCollection{}
     err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
             w.Header().Set("Content-Type", "application/json; charset=UTF-8")
             w.WriteHeader(http.StatusBadRequest)
             return
     }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }
    w.WriteHeader(http.StatusOK)
}

這就是我們遇到的第一個問題,簡單粗暴起協程處理耗時任務導致的系統不可控性。我們自然而然就會想,怎么能讓系統更可控呢。

優雅的方法

一個很自然的思路是,建立任務隊列。golang提供了線程安全的任務隊列實現方式--帶緩沖的channal。但是這樣只是延后了請求的爆發。作者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這里我將作者的代碼修改了一下,單文件執行,以記錄這一模式。

package main

import (
	"fmt"
	"reflect"
	"time"
)

var (
	MaxWorker = 10
)

type Payload struct {
	Num int
}

//待執行的工作
type Job struct {
	Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
	WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
	JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
	quit       chan bool     //退出信號
	no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
	fmt.Println("創建一個新工作者單元")
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),
		no:         no,
	}
}

//循環  監聽任務和結束信號
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel
			fmt.Println("w.WorkerPool <- w.JobChannel", w)
			select {
			case job := <-w.JobChannel:
				fmt.Println("job := <-w.JobChannel")
				// 收到任務
				fmt.Println(job)
				time.Sleep(100 * time.Second)
			case <-w.quit:
				// 收到退出信號
				return
			}
		}
	}()
}

// 停止信號
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

//調度中心
type Dispatcher struct {
	//工作者池
	WorkerPool chan chan Job
	//工作者數量
	MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
	// starting n number of workers
	for i := 1; i < d.MaxWorkers+1; i++ {
		worker := NewWorker(d.WorkerPool, i)
		worker.Start()
	}
	go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			fmt.Println("job := <-JobQueue:")
			go func(job Job) {
				//等待空閑worker (任務多的時候會阻塞這里)
				jobChannel := <-d.WorkerPool
				fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
				// 將任務放到上述woker的私有任務channal中
				jobChannel <- job
				fmt.Println("jobChannel <- job")
			}(job)
		}
	}
}

func main() {
	JobQueue = make(chan Job, 10)
	dispatcher := NewDispatcher(MaxWorker)
	dispatcher.Run()
	time.Sleep(1 * time.Second)
	go addQueue()
	time.Sleep(1000 * time.Second)
}

func addQueue() {
	for i := 0; i < 20; i++ {
		// 新建一個任務
		payLoad := Payload{Num: 1}
		work := Job{Payload: payLoad}
		// 任務放入任務隊列channal
		JobQueue <- work
		fmt.Println("JobQueue <- work")
		time.Sleep(1 * time.Second)
	}
}

/*
一個任務的執行過程如下
JobQueue <- work  新任務入隊
job := <-JobQueue: 調度中心收到任務
jobChannel := <-d.WorkerPool 從工作者池取到一個工作者
jobChannel <- job 任務給到工作者
job := <-w.JobChannel 工作者取出任務
{{1}} 執行任務
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

這樣,我們已經能夠主動的控制worker的數量。這時候,我們不妨設想一下,我們完全解決問題了么?如果有大量的任務同時涌入,會發生什么樣的結果。程序會阻塞等待可用的workerjobChannel := <-d.WorkerPool

//調度
func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			fmt.Println("job := <-JobQueue:")
			go func(job Job) {
				//等待空閑worker (任務多的時候會阻塞這里)
				jobChannel := <-d.WorkerPool
				fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
				// 將任務放到上述woker的私有任務channal中
				jobChannel <- job
				fmt.Println("jobChannel <- job")
			}(job)
		}
	}
}

不要忘記,這個調度方法也是在不斷的創建協程等待空閑的worker。我們改一下代碼如下:

package main

import (
	"fmt"
	"reflect"
	"time"
	"runtime"
)

var (
	MaxWorker = 10
)

type Payload struct {
	Num int
}

//待執行的工作
type Job struct {
	Payload Payload
}

//任務channal
var JobQueue chan Job

//執行任務的工作者單元
type Worker struct {
	WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
	JobChannel chan Job      //每個工作者單元包含一個任務管道 用於獲取任務
	quit       chan bool     //退出信號
	no         int           //編號
}

//創建一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
	fmt.Println("創建一個新工作者單元")
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),
		no:         no,
	}
}

//循環  監聽任務和結束信號
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel
			fmt.Println("w.WorkerPool <- w.JobChannel", w)
			select {
			case job := <-w.JobChannel:
				fmt.Println("job := <-w.JobChannel")
				// 收到任務
				fmt.Println(job)
				time.Sleep(100 * time.Second)
			case <-w.quit:
				// 收到退出信號
				return
			}
		}
	}()
}

// 停止信號
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

//調度中心
type Dispatcher struct {
	//工作者池
	WorkerPool chan chan Job
	//工作者數量
	MaxWorkers int
}

//創建調度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
	// starting n number of workers
	for i := 1; i < d.MaxWorkers+1; i++ {
		worker := NewWorker(d.WorkerPool, i)
		worker.Start()
	}
	go d.dispatch()
}

//調度
func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			fmt.Println("job := <-JobQueue:")
			go func(job Job) {
				fmt.Println("等待空閑worker (任務多的時候會阻塞這里")
				//等待空閑worker (任務多的時候會阻塞這里)
				jobChannel := <-d.WorkerPool
				fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
				// 將任務放到上述woker的私有任務channal中
				jobChannel <- job
				fmt.Println("jobChannel <- job")
			}(job)
		}
	}
}

func main() {
	JobQueue = make(chan Job, 10)
	dispatcher := NewDispatcher(MaxWorker)
	dispatcher.Run()
	time.Sleep(1 * time.Second)
	go addQueue()
	time.Sleep(1000 * time.Second)
}

func addQueue() {
	for i := 0; i < 100; i++ {
		// 新建一個任務
		payLoad := Payload{Num: i}
		work := Job{Payload: payLoad}
		// 任務放入任務隊列channal
		JobQueue <- work
		fmt.Println("JobQueue <- work", i)
		fmt.Println("當前協程數:", runtime.NumGoroutine())
		time.Sleep(100 * time.Millisecond)
	}
}

執行結果如下:

...
...

JobQueue <- work 97
當前協程數: 100
job := <-JobQueue:
等待空閑worker (任務多的時候會阻塞這里
JobQueue <- work 98
當前協程數: 101
job := <-JobQueue:
等待空閑worker (任務多的時候會阻塞這里
JobQueue <- work 99
當前協程數: 102

我們發現,我們依然沒能控制住協程數量,我們只是控制住了worker的數量。這種情況下,如果worker數量設置的合理且異步任務耗時不是特別長的情況下一般沒有問題。但是出於安全的考慮,我要把這個阻塞的協程數做一個控制,如果達到限制時候拒絕服務以保護系統該怎么處理?

真正控制協程數量(並發執行的任務數)

我們可以控制並發執行(包括等待執行)的任務數。我們加入任務使用如下判斷。用一個帶緩沖的Channel控制並發執行的任務數。當任務異步處理完成的時候執行<- DispatchNumControl釋放控制即可。用這種方法,我們可以根據壓測結果設置合適的並發數從而保證系統能夠盡可能的發揮自己的能力,同時保證不會因為任務量太大而崩潰(因為達到極限的時候,系統會告訴調用方--我很忙)。

//用於控制並發處理的協程數
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("我很忙")
      return false
   case DispatchNumControl <- true:
   // 任務放入任務隊列channal
      jobChannel <- work
      return true
   }
}

總結

總結一波,協程是個好的設計,但任何東西都不能過度使用。我們做系統設計的時候,一定也要時刻想着控制--要對自己設計的系統有着足夠的控制力。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM