golang實現並發爬蟲三(用隊列調度器實現)


欲看此文,必先可先看:

golang實現並發爬蟲一(單任務版本爬蟲功能)

gollang實現並發爬蟲二(簡單調度器)

 

上文中的用簡單的調度器實現了並發爬蟲。

並且,也提到了這種並發爬蟲的實現可以提高爬取效率。

當workerCount為1和workerCount為10時其爬取效率是有明顯不同的。

然而,文末其實也提到了這個簡單調度器實現的爬蟲有個不可控或者說是控制力太小了的問題。

究其原因就是因為我們實現的方法是來一個request就給創建一個groutine。

為了讓這個程序變得更為可控。得想想怎么可以優化下了。

 

現在,非常明顯,優化點就是我不想要來一個request就創建一個這個實現過程。

那么,我們可以想到隊列。

把request放到request隊列里。

那么,request隊列里一定是會有一個request的頭的,我們就可以把這個request的頭元素給到worker去做實現。

也就是這樣:

 

 

 but,這樣是沒有對worker進行一個控制的。

我們希望request可以選擇我們想要的一個worker。

那么,我們也可以讓scheduler維護一個worker的隊列。

 

 這里用了三個並行的模塊:

1.engine 引擎模塊。

2.scheduler 調度器模塊。

3.worker 工作模塊。

這三者通信都是通過channel來通信的。

 

上圖中可知道調度器模塊實際上是維護了2個channel,一個是request的channel,一個是worker的channel。

//隊列調度器
//這個scheduler與engine和worker之間的通信都是通過channel來連接的。
//故爾它的肚子里應該有request相關的channel和worker相關的channel.
//另外注意這里worker的channel的類型是chan Request。
type QueuedScheduler struct {
    requestChan chan con_engine.Request
    workerChan  chan chan con_engine.Request
}

那么,我們就只需要在這個scheduler調度器的兩個channel里,各取一個元素,即取request和worker(chan con_engine.Request),把request發給worker就可以了。

一直不斷的去取和發送,這就是這個隊列調度器要做的事情了。

那個彎曲的箭頭也就是指的這個事情了。在request的隊列里找到合適的request發給worker隊列里合適的worker就好。

這就是一個整體的思想了。

 

稍微說下關於維護如何兩個隊列的代碼。

重點在於怎么才能做到各讀取一個元素。

channel的讀取是會阻塞的。

如果我先讀取request,如果讀取不到,那么在等待的時候就沒有辦法取到worker了。

解決方案就是用select,因為select會保證一點,select里的每一個case都會被執行到且會很快速的執行。

func (s *QueuedScheduler) Run() {
    s.requestChan = make(chan con_engine.Request) //指針接收者才能改變里面的內容。
    s.workerChan = make(chan chan con_engine.Request)
    go func() {
        var requestQ []con_engine.Request
        var workerQ []chan con_engine.Request
        for {
            var activeRequest con_engine.Request
            var activeWorker chan con_engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeRequest = requestQ[0]
                activeWorker = workerQ[0]
            }
            //收到一個request就讓request排隊,收到一個worker就讓worker排隊。所有的channel操作都放到select里。
            select {
            case r := <-s.requestChan:
                requestQ = append(requestQ, r)
            case w := <-s.workerChan:
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest:
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

select就是在做三件事情:

1.從requestChan里收一個request,將這個request存在變量requestQ里。

2.從workerChan里收一個worker,將這個worker存在變量workerQ里。

3.把第一個requestQ里的第一個元素發給第一個workerQ里的第一個元素。  

 

其他代碼就感興趣的同學自己看吧。

作者就先說到這里。

總體調度的思想上面的圖中。

具體的實現在源碼里。

歡迎大家留言指教。

 

源碼:

https://github.com/anmutu/du_crawler/tree/master/04crawler

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM