啟動nsq
相信看了nsq這個系列的童鞋應該都知道nsq的啟動腳本在哪里了吧,沒錯。就是在apps/nsqd/main.go 文件。我們可以切到當前目錄,不過在這之前我們要先啟動位於 apps/nsqlookupd/目錄下的 nsqlookupd
#啟動 nsqlookupd
輸出--------------------------------------------------------
➜ nsqlookupd git:(master) ✗ go run main.go
[nsqlookupd] 2021/10/13 15:27:57.828505 INFO: nsqlookupd v1.2.1-alpha (built w/go1.15.15)
[nsqlookupd] 2021/10/13 15:27:57.828996 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2021/10/13 15:27:57.828996 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2021/10/13 15:31:20.121567 INFO: TCP: new client(127.0.0.1:54011)
[nsqlookupd] 2021/10/13 15:31:20.121852 INFO: CLIENT(127.0.0.1:54011): desired protocol magic ' V1'
[nsqlookupd] 2021/10/13 15:31:20.122590 INFO: CLIENT(127.0.0.1:54011): IDENTIFY Address:liangtiandeMacBook-Pro.local TCP:4150 HTTP:4151 Version:1.2.1-alpha
[nsqlookupd] 2021/10/13 15:31:20.122661 INFO: DB: client(127.0.0.1:54011) REGISTER category:client key: subkey:
[nsqlookupd] 2021/10/13 15:31:35.121527 INFO: CLIENT(127.0.0.1:54011): pinged (last ping 14.998981s)
[nsqlookupd] 2021/10/13 15:31:50.120787 INFO: CLIENT(127.0.0.1:54011): pinged (last ping 14.99928s)
#接着我們啟動nsq
go run main.go options.go --lookupd-tcp-address=127.0.0.1:4160
輸出------------------------------------------------------------------------
[nsqd] 2021/10/13 15:31:20.095882 INFO: nsqd v1.2.1-alpha (built w/go1.15.15)
[nsqd] 2021/10/13 15:31:20.096040 INFO: ID: 933
[nsqd] 2021/10/13 15:31:20.096421 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2021/10/13 15:31:20.120544 INFO: TCP: listening on [::]:4150
[nsqd] 2021/10/13 15:31:20.120655 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2021/10/13 15:31:20.120685 INFO: LOOKUP connecting to 127.0.0.1:4160
[nsqd] 2021/10/13 15:31:20.120686 INFO: HTTP: listening on [::]:4151
[nsqd] 2021/10/13 15:31:20.123026 INFO: LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.1-alpha BroadcastAddress:liangtiandeMacBook-Pro.local}
可以看到,會輸出默認的TCP和HTTP監聽的端口,並且會把數據文件寫入到當前目錄的 nsqd.dat 內。並且連上了nsqlookupd 這個時候我們就算啟動了nsqd。
nsq和nsqlookupd 的鏈接
雖然啟動成功了,但是我們還不知道nsq是怎么和nsqlookup鏈接上的,並且定期心跳的。其實nsqd主函數Main中啟動與nsqlookupd服務通訊的工作線程lookupLoop
ticker := time.Tick(15 * time.Second)
for {
if connect {
//循環所有的 nsqlookupd 地址 我們這邊就一個 127.0.0.1:4160
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
if in(host, lookupAddrs) {
continue
}
//LOOKUP(127.0.0.1:4160): adding peer
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
//實例化newLookupPeer數據結構,並且拿到鏈接callback方法
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
//嘗試鏈接
lookupPeer.Command(nil) // start the connection
//把nsqlookupd 寫入lookupPeers
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
第一次循環的時候,變量connect = true,nsq會嘗試鏈接nsqlookupd. 其實就是連接上nsqlookupd的tcp端口,並且發送 V1信息。
//標記鏈接成功
lp.state = stateConnected
//發送 V1
_, err = lp.Write(nsq.MagicV1)
發送后 V1后,如果沒有失敗,緊接着nsq會執行connectCallback,在connectCallback里面我們可以看下代碼:
func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
return func(lp *lookupPeer) {
//鑒權
ci := make(map[string]interface{})
ci["version"] = version.Binary
ci["tcp_port"] = n.RealTCPAddr().Port
ci["http_port"] = n.RealHTTPAddr().Port
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress
cmd, err := nsq.Identify(ci)
if err != nil {
lp.Close()
return
}
//發送鑒權
resp, err := lp.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
return
} else if bytes.Equal(resp, []byte("E_INVALID")) {
n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
lp.Close()
return
} else {
err = json.Unmarshal(resp, &lp.Info)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
lp.Close()
return
} else {
//鑒權成功
// LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.1-alpha BroadcastAddress:liangtiandeMacBook-Pro.local}
n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
if lp.Info.BroadcastAddress == "" {
n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
}
}
}
// build all the commands first so we exit the lock(s) as fast as possible
var commands []*nsq.Command
n.RLock()
for _, topic := range n.topicMap {
topic.RLock()
if len(topic.channelMap) == 0 {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
}
}
topic.RUnlock()
}
n.RUnlock()
for _, cmd := range commands {
n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
_, err := lp.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
return
}
}
}
}
connectCallback函數沒有任何邏輯,直接return了一個匿名函數,該匿名函數里首先會組裝一個map,把自己的信息寫入到內,包括版本,tcp端口,http端口,hostname, 主機名等包裝到一個結構體內發送。當nsqlookupd接收到信息后,其實不會做任何校驗,只是單純的拿到數據后放入到nsqlookupd的全局DB-map中的client內。
// body is a json structure with producer information
peerInfo := PeerInfo{id: client.RemoteAddr().String()}
//unmarshal 成功就表示注冊成功
err = json.Unmarshal(body, &peerInfo)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
}
peerInfo.RemoteAddress = client.RemoteAddr().String()
// require all fields
if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
}
atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())
p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)
//寫入到 client DB里
client.peerInfo = &peerInfo
if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
}
// build a response 返回response
data := make(map[string]interface{})
data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
data["version"] = version.Binary
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err)
}
data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
data["hostname"] = hostname
response, err := json.Marshal(data)
if err != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
return []byte("OK"), nil
}
return response, nil
鑒權成功后,會觸發當前nsq所有的topic和channel注冊到nsqlookupd內,由於我們是新啟動的服務所以這一步直接跳過。緊接着connect變量就會設置成false。表示鏈接成功了。后面的每次for循環基本上都是觸發了15秒的ticker做了一次ping操作。nsq 發送ping 到 nsqlookup后,nsqlookupd做的唯一一步就是把lastUpdate更新掉。其他沒做任何操作。
//每次ping后,修改lastUpdate 時間
atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
客戶端創建topic
nsq客戶端可以使用官方封裝的go-nsq包。
go get -u github.com/nsqio/go-nsq
我們簡單實現一個發送者程序:
addr := "127.0.0.1:4150"
topic := "first_topic"
channel := "first_channel"
defaultConfig := nsq.NewConfig()
//新建生產者
p, err := nsq.NewProducer(addr, defaultConfig)
if err != nil {
panic(err)
}
//創建一個topic
p.Publish(topic, []byte("Hello Pibigstar"))
接下來調用NewProducer方法 實例化一個 Producer對象:
func NewProducer(addr string, config *Config) (*Producer, error) {
//檢查配置文件,是否初始化,驗證是否成功
config.assertInitialized()
err := config.Validate()
if err != nil {
return nil, err
}
//實例化Producer
p := &Producer{
//id 自增1
id: atomic.AddInt64(&instCount, 1),
addr: addr,//nsqlookupd address
config: *config, //配置文件
logger: log.New(os.Stderr, "", log.Flags()),
logLvl: LogLevelInfo,
transactionChan: make(chan *ProducerTransaction),
exitChan: make(chan int),
responseChan: make(chan []byte),
errorChan: make(chan []byte),
}
return p, nil
}
該函數比較簡單,首先檢查了下配置是否初始化(initialized)。接着對配置項進行 min/max 范圍校驗。成功后就直接實例化Producer對象,Producer對象會保存config的引用和 的地址信息。
最后我們看下最核心的函數 Publish 基本上所有的邏輯都是在Publish里面實現的。, Publish函數本身沒有內容,它直接調用了 w.sendCommand(Publish(topic, body))
我們轉到 sendCommand 函數看下:
func (w *Producer) sendCommand(cmd *Command) error {
//提前設置了一個接受返回參數的Chan, 這里有伏筆,埋伏它一手
doneChan := make(chan *ProducerTransaction)
//調用了sendCommandAsync 並且把doneChan 傳進去了
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
//上面函數結束后,在這里苦苦的等待 doneChan的返回值,所以我們可以大膽的推測 sendCommandAsync 方法並不返回真實的值
t := <-doneChan
return t.Error
}
這個方法里面大家不要漏了 doneChan , nsq通過這個channel實現了一個高效的ioLoop模型。雖然說sendCommandAsync函數名里有個async,但是它並不是同步返回的。而是等待 doneChan這個channel 的返回。並且最后返回內部的Error屬性。我們繼續看下去。
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
該函數比較簡單,判斷是否連接,如果沒有連接調用 connect() 方法,接着包一個 ProducerTransaction結構體。記錄要發送的信息和剛才傳過來的doneChan,發送到 w.transactionChan內。到這里發送者代碼就全部完了。但是這就全部看完了嗎。其實我們只是看了冰山一角。接下里我們要看下 connect() 方法。
func (w *Producer) connect() error {
...
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
_, err := w.conn.Connect()
atomic.StoreInt32(&w.state, StateConnected)
w.closeChan = make(chan int)
w.wg.Add(1)
go w.router()
return nil
}
NewConn傳入配置文件,初始化Conn結構體。調用w.conn.Connect() 連接。我們先簡單看下 w.conn.Connect()方法:
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
//打開tcp端口
conn, err := dialer.Dial("tcp", c.addr)
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
//發送[]byte(" V2")
_, err = c.Write(MagicV2)
//身份校驗
resp, err := c.identify()
if err != nil {
return nil, err
}
if resp != nil && resp.AuthRequired {
if c.config.AuthSecret == "" {
c.log(LogLevelError, "Auth Required")
return nil, errors.New("Auth Required")
}
err := c.auth(c.config.AuthSecret)
if err != nil {
c.log(LogLevelError, "Auth Failed %s", err)
return nil, err
}
}
c.wg.Add(2)
atomic.StoreInt32(&c.readLoopRunning, 1)
go c.readLoop()
go c.writeLoop()
return resp, nil
}
該方法連接到 ,並進行身份校驗。核心是最后幾行代碼。開了2個協程。readLoop() 和 writeLoop() 用來接收消息和寫入消息。
這里先停頓下,我們先跳回去繼續看producer,當它連接上 之后,會開個 go w.router() 協程,我們看下內部實現:
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
}
在這個方法里就是監聽多個chan,分別是:是否有需要發送的消息,是否有收到的響應,是否有錯誤,是否有退出消息。剛才我們包裝的 transactionChan 就是通過這里發出去了。到這里整個發送流程就已經全部完成了。那么我們怎么知道到底是發送成功還是失敗了呢。這個時候就回到了剛才我們看到的在connect的時候開的2個協程readLoop() 和 writeLoop() 了。在這之前我們先了解下nsq的三種消息類型:
frame typesconst (
FrameTypeResponse int32 = 0 //響應
FrameTypeError int32 = 1 //錯誤
FrameTypeMessage int32 = 2 //消息
)
然后我們看下readLoop
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
...
frameType, data, err := ReadUnpackedResponse(c)
...
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg)
case FrameTypeError:
c.log(LogLevelError, "protocol error - %s", data)
c.delegate.OnError(c, data)
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
exit:
...
}
readLoop 核心的代碼接收到消息后判斷消息類型,如果是響應(FrameTypeResponse)。調用 c.delegate.OnResponse(c, data) ,如果是消息(FrameTypeMessage),那么就增加messageInFlight 數,並且更新lastMsgTimestamp 時間,最后調用 c.delegate.OnMessage(c, msg)。 但是 c.delegate 這個接口是哪里實現的呢。我們可以看到 producer 在connect的時候傳入過:
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
所以最終就是調用producerConnDelegate結構體的 OnResponse 和OnMessage。 轉到producerConnDelegate結構體的方法非常簡單:
func (d *producerConnDelegate) OnResponse(c *Conn, data []byte) { d.w.onConnResponse(c, data) }
func (d *producerConnDelegate) OnError(c *Conn, data []byte) { d.w.onConnError(c, data) }
func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
它只處理了OnResponse 和 onError, 針對OnMessage不做任何處理。OnResponse也很簡單。d.w.onConnResponse(c, data) 我們轉到 onConnResponse 發現也是只有一句代碼:
func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }
這里就和之前的 producer的router方法對應上了。router接受 responseChan或者 errorChan 執行 popTransaction 方法。
func (w *Producer) popTransaction(frameType int32, data []byte) {
t := w.transactions[0]
w.transactions = w.transactions[1:]
if frameType == FrameTypeError {
t.Error = ErrProtocol{string(data)}
}
t.finish()
}
首先獲取第一個transactions中的元素,如果是錯誤的響應,那么給他的Error上設置錯誤信息,最后調用finish方法
func (t *ProducerTransaction) finish() {
if t.doneChan != nil {
t.doneChan <- t
}
}
這時候發送的就我們剛才創建的doneChan中傳入發送結果,那么用戶就可以通過doneChan知道消息是否發送成功了。最后我畫了一幅簡單的圖大家可以參考下: