NSQ之粗讀淺談


回顧:

  以前一直是C++開發(客戶端),最近聽同事講go語言不錯,隨后便決定先從go語法開始投向go的懷抱。由於歷史原因學習go語法時,用了半天的時間看完了菜鳥教程上相關資料,后來又看了易百教程上的一些實例代碼,感覺都比較簡單,畢竟還是有C++基礎存在的。。。但是找工作大多都是需要工作經驗的,那么怎么辦才好呢!后來在知乎上看到有一位大神推薦看NSQ和skynet開源框架,權衡之下我決定從NSQ開始學習,進入我的go學習之路。

  要學習NSQ,首先就是上www查找相關NSQ的資料,沒想到百度一下相關資料還是挺多,有好幾個網友的博客都寫的不錯,思路比較清晰,但是基本都沒有完全的把NSQ分析下來,只能后邊繼續等待了。文章末尾我會把自己覺着可以幫助理解的文章鏈接貼上。

  本篇文章我不打算直接開始分析NSQ框架代碼,而是想從一些零散的地方着手,針對性的講NSQ,這樣有利於后期我們逐模塊分析NSQ源碼。

  下邊就開始我們文本的中心內容,文章結構是按點划分,會比較零散但都是個人在看NSQ源碼時的心得,也可以說是個人覺得比較難理解的地方吧,看NSQ之前,只學習了go語法2天時間,因此文中可能會涉及到一些基礎性錯誤,歡迎大家指正

一、NSQ部署

  我個人電腦是win10 64位,因此在這兒我就給出一個網友寫好的Windows下NSQ部署文章NSQ如何在windows上安裝 ,安裝網友的文章說明我的測試結果圖1所示,當啟動nsqd連接nsqlookupd時,會有相應的提示,啟動nsq_to_file進程時,會往nsqd寫入消息都有相應提示,如圖1中用紅色矩形框選中的是對於的關鍵提示信息。

圖1 NSQ部署測試

  打開瀏覽器直接輸入http://127.0.0.1:4171/就可以查看NSQ運行情況

二、NSQ拓撲圖

  通過第一小節,我們簡單的把NSQ部署起來,並看到了NSQ的運行情況,還記得我們啟動各個進程的步驟嗎,不記得沒關系,看圖2所示,該圖是出自nsq源碼分析之概述,個人覺着這幅圖對NSQ總結的非常好,從圖中我們可以了解到下面幾個點

  • nsqlookupd進程同時開啟tcp和http兩個監聽服務,TCP監聽是用於NSQD進程連接,http監聽是用於提供給nsqadmin獲取集群信息
  • nsqadmin進程只開啟http服務,其實就是一個web服務,提供給客戶端查詢集群信息
  • nsqd進程同時開啟tcp和http服務,TPC監聽和http監聽都提供給生產者和消費者連接,http服務還提供給nsqadmin獲取該nsqd本地信息
  • nsqd連接到nsqlookupd的tcp監聽上,通過心跳告訴nsqlookupd自己在線
  • writer是生產者,直接連接nsqd
  • reader是消費者,直接連接nsqd

圖2 NSQ拓撲圖

  了解了nsq的整體結構后,我們就可以開始按模塊分析nsq的源碼

 三、NSQ啟動與退出

  nsq優雅的啟動與退出使用了SVC包,推薦閱讀Nsq源碼閱讀(1) 啟動和優雅退出,這篇文章講解的非常詳細。

四、全局唯一messageid

  Nsq源碼閱讀(6) 全局唯一messageid

五、NSQ同步

  NSQ中簡單包裝了sync.WaitGroup,包裝后的待執行函數都會在輕量級的線程中執行,代碼如下所示。主線程中啟動了多個子線程后,只有等啟動的多個子線程結束后主線程才能結束,方法Wrap中的Add和Done調用分別會維護一個引用計數,只有當該引用計數為0時,主線程才會結束等待

 1 //如果結構體S,包含一個匿名字段T,那么這個結構體S 就有了T的方法。
 2 type WaitGroupWrapper struct {
 3     sync.WaitGroup
 4 }
 5 
 6 func (w *WaitGroupWrapper) Wrap(cb func()) {
 7     w.Add(1) //sync.WaitGroup結構中方法
 8     go func() {
 9         cb()
10         w.Done() //sync.WaitGroup結構中方法
11     }()
12 }

六、Go接口

  Go語言式的接口,就是不用顯示聲明類型T實現了接口I,只要類型T的公開方法完全滿足接口I的要求,就可以把類型T的對象用在需要接口I的地方。比如nsqlookupd.go文件中Main函數最后啟動http監聽服務時代碼

 1 l.Lock()                                                                                     
 2 l.httpListener = httpListener //把Listener存在NSQLookupd的struct里                                
 3 l.Unlock()                                                                                   
 4 //創建httpServer的實例,httpServer在nsqlookupd\http.go文件中定義                                         
 5 httpServer := newHTTPServer(ctx)                                                             
 6 //調用http_api.Serve方法(在http_api\http_server.go中定義)開始在指定的httpListener上接收http連接。                
 7 l.waitGroup.Wrap(func() {                                                                    
 8     //因為httpServer結構重寫了http.Handler接口類的ServeHTTP方法,因此可以當http.Handler使用                       
 9     http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)                          
10 })                                                                                           

  代碼第9行調用中的第二個參數httpServer,是一個自定義的struct,而http_apit.Serve需要的參數類型為http.Handler,因為httpServer實現了http.Handler接口類中的接口,因此可以在這個地方使用,對每一個類型實現接口方法如下

1 func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
2     s.router.ServeHTTP(w, req)
3 }

七、NSQD心跳

  NSQD作為消息接收、轉發者,他也是nsqlookupd的客戶端,當nsqd啟動時,除過自己啟動tcp和http服務等待生產者和消費者連接外,還需要作為client連接到nsqlookupd。在nsqd.go文件的Main函數最后,通過waitGroup啟動了3個線程,如下代碼所示,第2行代碼啟動了一個lookupLoop循環,該循環是一個死循環,其中有一項功能就是發送心跳值,告訴所有的nsqlookupd,自己還活着。

1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循環處理消息的分發                                             
2 n.waitGroup.Wrap(func() { n.lookupLoop() })    //同步nsqd狀態到nsqlookup比如:在線、Topic變化、Channel變化等            
3 if n.getOpts().StatsdAddress != "" {                                                                   
4     n.waitGroup.Wrap(func() { n.statsdLoop() })          
5 }                                              

  心跳發送代碼在lookup.go文件中,處理方式如下

 1 case <-ticker: //發送心跳  告訴nsqlookup自己在線                                         
 2     // send a heartbeat and read a response (read detects closed conns)        
 3     for _, lookupPeer := range lookupPeers {                                   
 4         n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)                   
 5         cmd := nsq.Ping()                                                      
 6         _, err := lookupPeer.Command(cmd)                                      
 7         if err != nil {                                                        
 8             n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)         
 9         }                                                                      
10     }                                                                          

  nsqlookupd作為服務器端,啟動時就開啟了tcp監聽,每接受一個nsqd連接,就使用go開啟一個handle處理函數,最終和客戶端(nsqd)交互功能代碼在LookupProtocolV1文件中完成。

  此時我們在看圖2的NSQ拓撲圖,說過了nsqd作為client鏈接nsqlookupd服務器端后,我們在順道說下nsqd開啟tcp監控作為服務器時有哪些是客戶端,此時的client包括:生產者和消費者,即圖中的writer和reader。

八、Decorator裝飾器

  先看下NSQ源碼中對Decorator定義,其實就是一個函數的嵌套定義,源碼位置在internal/http_api/api_response.go文件中,代碼如下:

1 type Decorator func(APIHandler) APIHandler
2 
3 type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)

  下邊我們來看兩個關於Decorate的使用

 1 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
 2     decorated := f
 3     for _, decorate := range ds {
 4         decorated = decorate(decorated)
 5     }
 6     return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 7         decorated(w, req, ps)
 8     }
 9 }
10 
11 func Log(l app.Logger) Decorator { //Logger是go對應的log4版本
12     return func(f APIHandler) APIHandler {
13         return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
14             start := time.Now()            //當前時間
15             response, err := f(w, req, ps) //執行http請求 err錯誤狀態  response處理結果
16             elapsed := time.Since(start)   //f(APIHandler)調用時長
17             status := 200
18             if e, ok := err.(Err); ok {
19                 status = e.Code //重置錯誤碼
20             }
21             l.Output(2, fmt.Sprintf("%d %s %s (%s) %s",
22                 status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //輸出http處理日志信息
23             return response, err //返回一個接口  和錯誤狀態
24         }
25     }
26 }

  首先先來看下代碼第一行定義的Decorate函數,這個函數其實就是一個裝飾函數,第一個參數為需要被裝飾的視圖函數,從第二參數開始,都是裝飾函數,最后返回裝飾好的視圖函數。

  第11行代碼定義了一個Log函數,返回值為Decorator類型,也就是代碼第12行return后邊的表達式,該表達式也是一個函數定義,其返回值為APIHandler,第13行代碼return后邊的表達式為其返回值。

 

相關資料

  NSQ如何在windows上安裝

  nsq源碼閱讀筆記之nsqd:hurray123 CSDN

  go語言nsq源碼解讀-基本介紹:熊窩窩

  Nsq源碼閱讀:胖梁的技術筆記

  nsq源碼分析之概述:羅道文的私房菜

 

如果您覺得文章不錯,不妨給個打賞,寫作不易,感謝各位的支持。您的支持是我最大的動力,謝謝!!! 

 

  


很重要--轉載聲明

  1. 本站文章無特別說明,皆為原創,版權所有,轉載時請用鏈接的方式,給出原文出處。同時寫上原作者:朝十晚八 or Twowords
  2. 如要轉載,請原文轉載,如在轉載時修改本文,請事先告知,謝絕在轉載時通過修改本文達到有利於轉載者的目的。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM