回顧:
以前一直是C++開發(客戶端),最近聽同事講go語言不錯,隨后便決定先從go語法開始投向go的懷抱。由於歷史原因學習go語法時,用了半天的時間看完了菜鳥教程上相關資料,后來又看了易百教程上的一些實例代碼,感覺都比較簡單,畢竟還是有C++基礎存在的。。。但是找工作大多都是需要工作經驗的,那么怎么辦才好呢!后來在知乎上看到有一位大神推薦看NSQ和skynet開源框架,權衡之下我決定從NSQ開始學習,進入我的go學習之路。
要學習NSQ,首先就是上www查找相關NSQ的資料,沒想到百度一下相關資料還是挺多,有好幾個網友的博客都寫的不錯,思路比較清晰,但是基本都沒有完全的把NSQ分析下來,只能后邊繼續等待了。文章末尾我會把自己覺着可以幫助理解的文章鏈接貼上。
本篇文章我不打算直接開始分析NSQ框架代碼,而是想從一些零散的地方着手,針對性的講NSQ,這樣有利於后期我們逐模塊分析NSQ源碼。
下邊就開始我們文本的中心內容,文章結構是按點划分,會比較零散但都是個人在看NSQ源碼時的心得,也可以說是個人覺得比較難理解的地方吧,看NSQ之前,只學習了go語法2天時間,因此文中可能會涉及到一些基礎性錯誤,歡迎大家指正。
一、NSQ部署
我個人電腦是win10 64位,因此在這兒我就給出一個網友寫好的Windows下NSQ部署文章NSQ如何在windows上安裝 ,安裝網友的文章說明我的測試結果圖1所示,當啟動nsqd連接nsqlookupd時,會有相應的提示,啟動nsq_to_file進程時,會往nsqd寫入消息都有相應提示,如圖1中用紅色矩形框選中的是對於的關鍵提示信息。
圖1 NSQ部署測試
打開瀏覽器直接輸入http://127.0.0.1:4171/
就可以查看NSQ運行情況
二、NSQ拓撲圖
通過第一小節,我們簡單的把NSQ部署起來,並看到了NSQ的運行情況,還記得我們啟動各個進程的步驟嗎,不記得沒關系,看圖2所示,該圖是出自nsq源碼分析之概述,個人覺着這幅圖對NSQ總結的非常好,從圖中我們可以了解到下面幾個點
- nsqlookupd進程同時開啟tcp和http兩個監聽服務,TCP監聽是用於NSQD進程連接,http監聽是用於提供給nsqadmin獲取集群信息
- nsqadmin進程只開啟http服務,其實就是一個web服務,提供給客戶端查詢集群信息
- nsqd進程同時開啟tcp和http服務,TPC監聽和http監聽都提供給生產者和消費者連接,http服務還提供給nsqadmin獲取該nsqd本地信息
- nsqd連接到nsqlookupd的tcp監聽上,通過心跳告訴nsqlookupd自己在線
- writer是生產者,直接連接nsqd
- reader是消費者,直接連接nsqd
圖2 NSQ拓撲圖
了解了nsq的整體結構后,我們就可以開始按模塊分析nsq的源碼
三、NSQ啟動與退出
nsq優雅的啟動與退出使用了SVC包,推薦閱讀Nsq源碼閱讀(1) 啟動和優雅退出,這篇文章講解的非常詳細。
四、全局唯一messageid
五、NSQ同步
NSQ中簡單包裝了sync.WaitGroup,包裝后的待執行函數都會在輕量級的線程中執行,代碼如下所示。主線程中啟動了多個子線程后,只有等啟動的多個子線程結束后主線程才能結束,方法Wrap中的Add和Done調用分別會維護一個引用計數,只有當該引用計數為0時,主線程才會結束等待
1 //如果結構體S,包含一個匿名字段T,那么這個結構體S 就有了T的方法。 2 type WaitGroupWrapper struct { 3 sync.WaitGroup 4 } 5 6 func (w *WaitGroupWrapper) Wrap(cb func()) { 7 w.Add(1) //sync.WaitGroup結構中方法 8 go func() { 9 cb() 10 w.Done() //sync.WaitGroup結構中方法 11 }() 12 }
六、Go接口
Go語言式的接口,就是不用顯示聲明類型T實現了接口I,只要類型T的公開方法完全滿足接口I的要求,就可以把類型T的對象用在需要接口I的地方。比如nsqlookupd.go文件中Main函數最后啟動http監聽服務時代碼
1 l.Lock() 2 l.httpListener = httpListener //把Listener存在NSQLookupd的struct里 3 l.Unlock() 4 //創建httpServer的實例,httpServer在nsqlookupd\http.go文件中定義 5 httpServer := newHTTPServer(ctx) 6 //調用http_api.Serve方法(在http_api\http_server.go中定義)開始在指定的httpListener上接收http連接。 7 l.waitGroup.Wrap(func() { 8 //因為httpServer結構重寫了http.Handler接口類的ServeHTTP方法,因此可以當http.Handler使用 9 http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) 10 })
代碼第9行調用中的第二個參數httpServer,是一個自定義的struct,而http_apit.Serve需要的參數類型為http.Handler,因為httpServer實現了http.Handler接口類中的接口,因此可以在這個地方使用,對每一個類型實現接口方法如下
1 func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { 2 s.router.ServeHTTP(w, req) 3 }
七、NSQD心跳
NSQD作為消息接收、轉發者,他也是nsqlookupd的客戶端,當nsqd啟動時,除過自己啟動tcp和http服務等待生產者和消費者連接外,還需要作為client連接到nsqlookupd。在nsqd.go文件的Main函數最后,通過waitGroup啟動了3個線程,如下代碼所示,第2行代碼啟動了一個lookupLoop循環,該循環是一個死循環,其中有一項功能就是發送心跳值,告訴所有的nsqlookupd,自己還活着。
1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循環處理消息的分發 2 n.waitGroup.Wrap(func() { n.lookupLoop() }) //同步nsqd狀態到nsqlookup比如:在線、Topic變化、Channel變化等 3 if n.getOpts().StatsdAddress != "" { 4 n.waitGroup.Wrap(func() { n.statsdLoop() }) 5 }
心跳發送代碼在lookup.go文件中,處理方式如下
1 case <-ticker: //發送心跳 告訴nsqlookup自己在線 2 // send a heartbeat and read a response (read detects closed conns) 3 for _, lookupPeer := range lookupPeers { 4 n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) 5 cmd := nsq.Ping() 6 _, err := lookupPeer.Command(cmd) 7 if err != nil { 8 n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) 9 } 10 }
nsqlookupd作為服務器端,啟動時就開啟了tcp監聽,每接受一個nsqd連接,就使用go開啟一個handle處理函數,最終和客戶端(nsqd)交互功能代碼在LookupProtocolV1文件中完成。
此時我們在看圖2的NSQ拓撲圖,說過了nsqd作為client鏈接nsqlookupd服務器端后,我們在順道說下nsqd開啟tcp監控作為服務器時有哪些是客戶端,此時的client包括:生產者和消費者,即圖中的writer和reader。
八、Decorator裝飾器
先看下NSQ源碼中對Decorator定義,其實就是一個函數的嵌套定義,源碼位置在internal/http_api/api_response.go文件中,代碼如下:
1 type Decorator func(APIHandler) APIHandler 2 3 type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
下邊我們來看兩個關於Decorate的使用
1 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { 2 decorated := f 3 for _, decorate := range ds { 4 decorated = decorate(decorated) 5 } 6 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { 7 decorated(w, req, ps) 8 } 9 } 10 11 func Log(l app.Logger) Decorator { //Logger是go對應的log4版本 12 return func(f APIHandler) APIHandler { 13 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { 14 start := time.Now() //當前時間 15 response, err := f(w, req, ps) //執行http請求 err錯誤狀態 response處理結果 16 elapsed := time.Since(start) //f(APIHandler)調用時長 17 status := 200 18 if e, ok := err.(Err); ok { 19 status = e.Code //重置錯誤碼 20 } 21 l.Output(2, fmt.Sprintf("%d %s %s (%s) %s", 22 status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //輸出http處理日志信息 23 return response, err //返回一個接口 和錯誤狀態 24 } 25 } 26 }
首先先來看下代碼第一行定義的Decorate函數,這個函數其實就是一個裝飾函數,第一個參數為需要被裝飾的視圖函數,從第二參數開始,都是裝飾函數,最后返回裝飾好的視圖函數。
第11行代碼定義了一個Log函數,返回值為Decorator類型,也就是代碼第12行return后邊的表達式,該表達式也是一個函數定義,其返回值為APIHandler,第13行代碼return后邊的表達式為其返回值。
相關資料