nsq - 一條消息的生命周期(一)


經過前面幾篇的學習,相信大家對nsq已經有了一個大概的了解,我在寫這篇文章的時候也看了很多其他人寫的教程,發現大家對於分析系統每個點寫的很不錯,但是都很少有整體串起來一起走一遍,所以,我打算分成2-3章來帶着大家從nsq啟動到創建一個topic,然后發一條消息,最后再開個消費者接收消息,中間的所有流程都帶大家一起走一遍,從而讓大家能夠深入地理解nsq的整體運行機制。今天,這篇文章是整個 《一條消息的生命周期》第一章,我會從nsq的啟動,nsqlookupd連接等方面開始講起。

啟動nsq

相信看了nsq這個系列的童鞋應該都知道nsq的啟動腳本在哪里了吧,沒錯。就是在apps/nsqd/main.go 文件。我們可以切到當前目錄,不過在這之前我們要先啟動位於 apps/nsqlookupd/目錄下的 nsqlookupd

#啟動 nsqlookupd
輸出--------------------------------------------------------
➜  nsqlookupd git:(master) ✗ go run main.go 
[nsqlookupd] 2021/10/13 15:27:57.828505 INFO: nsqlookupd v1.2.1-alpha (built w/go1.15.15)
[nsqlookupd] 2021/10/13 15:27:57.828996 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2021/10/13 15:27:57.828996 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2021/10/13 15:31:20.121567 INFO: TCP: new client(127.0.0.1:54011)
[nsqlookupd] 2021/10/13 15:31:20.121852 INFO: CLIENT(127.0.0.1:54011): desired protocol magic '  V1'
[nsqlookupd] 2021/10/13 15:31:20.122590 INFO: CLIENT(127.0.0.1:54011): IDENTIFY Address:liangtiandeMacBook-Pro.local TCP:4150 HTTP:4151 Version:1.2.1-alpha
[nsqlookupd] 2021/10/13 15:31:20.122661 INFO: DB: client(127.0.0.1:54011) REGISTER category:client key: subkey:
[nsqlookupd] 2021/10/13 15:31:35.121527 INFO: CLIENT(127.0.0.1:54011): pinged (last ping 14.998981s)
[nsqlookupd] 2021/10/13 15:31:50.120787 INFO: CLIENT(127.0.0.1:54011): pinged (last ping 14.99928s)
​
​
#接着我們啟動nsq 
go run main.go options.go  --lookupd-tcp-address=127.0.0.1:4160
輸出------------------------------------------------------------------------
[nsqd] 2021/10/13 15:31:20.095882 INFO: nsqd v1.2.1-alpha (built w/go1.15.15)
[nsqd] 2021/10/13 15:31:20.096040 INFO: ID: 933
[nsqd] 2021/10/13 15:31:20.096421 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2021/10/13 15:31:20.120544 INFO: TCP: listening on [::]:4150
[nsqd] 2021/10/13 15:31:20.120655 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2021/10/13 15:31:20.120685 INFO: LOOKUP connecting to 127.0.0.1:4160
[nsqd] 2021/10/13 15:31:20.120686 INFO: HTTP: listening on [::]:4151
[nsqd] 2021/10/13 15:31:20.123026 INFO: LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.1-alpha BroadcastAddress:liangtiandeMacBook-Pro.local}

可以看到,會輸出默認的TCP和HTTP監聽的端口,並且會把數據文件寫入到當前目錄的 nsqd.dat 內。並且連上了nsqlookupd 這個時候我們就算啟動了nsqd。

nsq和nsqlookupd 的鏈接

雖然啟動成功了,但是我們還不知道nsq是怎么和nsqlookup鏈接上的,並且定期心跳的。其實nsqd主函數Main中啟動與nsqlookupd服務通訊的工作線程lookupLoop

ticker := time.Tick(15 * time.Second)
  for {
    if connect {
      //循環所有的 nsqlookupd 地址 我們這邊就一個 127.0.0.1:4160
      for _, host := range n.getOpts().NSQLookupdTCPAddresses {
        if in(host, lookupAddrs) {
          continue
        }
        //LOOKUP(127.0.0.1:4160): adding peer
        n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
        //實例化newLookupPeer數據結構,並且拿到鏈接callback方法
        lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
          connectCallback(n, hostname))
        //嘗試鏈接
        lookupPeer.Command(nil) // start the connection
        //把nsqlookupd 寫入lookupPeers
        lookupPeers = append(lookupPeers, lookupPeer)
        lookupAddrs = append(lookupAddrs, host)
      }
      n.lookupPeers.Store(lookupPeers)
      connect = false
    }
​

第一次循環的時候,變量connect = true,nsq會嘗試鏈接nsqlookupd. 其實就是連接上nsqlookupd的tcp端口,並且發送 V1信息。

//標記鏈接成功
lp.state = stateConnected
//發送  V1
_, err = lp.Write(nsq.MagicV1)

發送后 V1后,如果沒有失敗,緊接着nsq會執行connectCallback,在connectCallback里面我們可以看下代碼:

func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
  return func(lp *lookupPeer) {
    //鑒權
    ci := make(map[string]interface{})
    ci["version"] = version.Binary
    ci["tcp_port"] = n.RealTCPAddr().Port
    ci["http_port"] = n.RealHTTPAddr().Port
    ci["hostname"] = hostname
    ci["broadcast_address"] = n.getOpts().BroadcastAddress
​
    cmd, err := nsq.Identify(ci)
    if err != nil {
      lp.Close()
      return
    }
    //發送鑒權
    resp, err := lp.Command(cmd)
    if err != nil {
      n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
      return
    } else if bytes.Equal(resp, []byte("E_INVALID")) {
      n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
      lp.Close()
      return
    } else {
      err = json.Unmarshal(resp, &lp.Info)
      if err != nil {
        n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
        lp.Close()
        return
      } else {
        //鑒權成功
        // LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.1-alpha BroadcastAddress:liangtiandeMacBook-Pro.local}
        n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
        if lp.Info.BroadcastAddress == "" {
          n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
        }
      }
    }
​
    // build all the commands first so we exit the lock(s) as fast as possible
    var commands []*nsq.Command
    n.RLock()
    for _, topic := range n.topicMap {
      topic.RLock()
      if len(topic.channelMap) == 0 {
        commands = append(commands, nsq.Register(topic.name, ""))
      } else {
        for _, channel := range topic.channelMap {
          commands = append(commands, nsq.Register(channel.topicName, channel.name))
        }
      }
      topic.RUnlock()
    }
    n.RUnlock()
    for _, cmd := range commands {
      n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
      _, err := lp.Command(cmd)
      if err != nil {
        n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
        return
      }
    }
  }
}

connectCallback函數沒有任何邏輯,直接return了一個匿名函數,該匿名函數里首先會組裝一個map,把自己的信息寫入到內,包括版本,tcp端口,http端口,hostname, 主機名等包裝到一個結構體內發送。當nsqlookupd接收到信息后,其實不會做任何校驗,只是單純的拿到數據后放入到nsqlookupd的全局DB-map中的client內。

// body is a json structure with producer information
  peerInfo := PeerInfo{id: client.RemoteAddr().String()}
  //unmarshal 成功就表示注冊成功
  err = json.Unmarshal(body, &peerInfo)
  if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
  }
​
  peerInfo.RemoteAddress = client.RemoteAddr().String()
​
  // require all fields
  if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
    return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
  }
​
  atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())
​
  p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
    client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)
​
  //寫入到 client DB里
  client.peerInfo = &peerInfo
  if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
  }
​
  // build a response  返回response
  data := make(map[string]interface{})
  data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
  data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
  data["version"] = version.Binary
  hostname, err := os.Hostname()
  if err != nil {
    log.Fatalf("ERROR: unable to get hostname %s", err)
  }
  data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
  data["hostname"] = hostname
​
  response, err := json.Marshal(data)
  if err != nil {
    p.ctx.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data)
    return []byte("OK"), nil
  }
  return response, nil

鑒權成功后,會觸發當前nsq所有的topic和channel注冊到nsqlookupd內,由於我們是新啟動的服務所以這一步直接跳過。緊接着connect變量就會設置成false。表示鏈接成功了。后面的每次for循環基本上都是觸發了15秒的ticker做了一次ping操作。nsq 發送ping 到 nsqlookup后,nsqlookupd做的唯一一步就是把lastUpdate更新掉。其他沒做任何操作。

//每次ping后,修改lastUpdate 時間
atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())

客戶端創建topic

nsq客戶端可以使用官方封裝的go-nsq包。

go get -u github.com/nsqio/go-nsq

我們簡單實現一個發送者程序:

addr := "127.0.0.1:4150"
topic := "first_topic"
channel := "first_channel"
defaultConfig := nsq.NewConfig()
//新建生產者
p, err := nsq.NewProducer(addr, defaultConfig)
if err != nil {
panic(err)
}
//創建一個topic
p.Publish(topic, []byte("Hello Pibigstar"))

我們可以看到,先實例化了 NewConfig. 它會獲得nsqclient的Config對象,並且通過Config結構體的默認配置注入配置。設置Config對象里的initialized屬性為true. 表示初始化成功。這里有個注意點,nsq的golang客戶端中,consumer實現了從nsqlookupd中動態拉取服務列表,並進行消費,但是producer中沒有實現這個。所以發送消息需要填寫nsq的地址。

接下來調用NewProducer方法 實例化一個 Producer對象:

func NewProducer(addr string, config *Config) (*Producer, error) {
  //檢查配置文件,是否初始化,驗證是否成功
  config.assertInitialized()
  err := config.Validate()
  if err != nil {
    return nil, err
  }
  //實例化Producer
  p := &Producer{
    //id 自增1
    id: atomic.AddInt64(&instCount, 1),
​
    addr:   addr,//nsqlookupd address
    config: *config,  //配置文件
​
    logger: log.New(os.Stderr, "", log.Flags()),
    logLvl: LogLevelInfo,
​
    transactionChan: make(chan *ProducerTransaction),
    exitChan:        make(chan int),
    responseChan:    make(chan []byte),
    errorChan:       make(chan []byte),
  }
  return p, nil
}

該函數比較簡單,首先檢查了下配置是否初始化(initialized)。接着對配置項進行 min/max 范圍校驗。成功后就直接實例化Producer對象,Producer對象會保存config的引用和nsq 的地址信息。

最后我們看下最核心的函數 Publish 基本上所有的邏輯都是在Publish里面實現的。, Publish函數本身沒有內容,它直接調用了 w.sendCommand(Publish(topic, body)) 我們轉到 sendCommand 函數看下:

func (w *Producer) sendCommand(cmd *Command) error {
  //提前設置了一個接受返回參數的Chan, 這里有伏筆,埋伏它一手
  doneChan := make(chan *ProducerTransaction)
  //調用了sendCommandAsync 並且把doneChan 傳進去了
  err := w.sendCommandAsync(cmd, doneChan, nil)
  if err != nil {
    close(doneChan)
    return err
  }
  //上面函數結束后,在這里苦苦的等待 doneChan的返回值,所以我們可以大膽的推測 sendCommandAsync 方法並不返回真實的值
  t := <-doneChan
  return t.Error
}

這個方法里面大家不要漏了 doneChan , nsq通過這個channel實現了一個高效的ioLoop模型。雖然說sendCommandAsync函數名里有個async,但是它並不是同步返回的。而是等待 doneChan這個channel 的返回。並且最后返回內部的Error屬性。我們繼續看下去。

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
  args []interface{}) error {
  // keep track of how many outstanding producers we're dealing with
  // in order to later ensure that we clean them all up...
  atomic.AddInt32(&w.concurrentProducers, 1)
  defer atomic.AddInt32(&w.concurrentProducers, -1)
  if atomic.LoadInt32(&w.state) != StateConnected {
    err := w.connect()
    if err != nil {
      return err
    }
  }
  t := &ProducerTransaction{
    cmd:      cmd,
    doneChan: doneChan,
    Args:     args,
  }
  select {
  case w.transactionChan <- t:
  case <-w.exitChan:
    return ErrStopped
  }
  return nil
}

該函數比較簡單,判斷是否連接,如果沒有連接調用 connect() 方法,接着包一個 ProducerTransaction結構體。記錄要發送的信息和剛才傳過來的doneChan,發送到 w.transactionChan內。到這里發送者代碼就全部完了。但是這就全部看完了嗎。其實我們只是看了冰山一角。接下里我們要看下 connect() 方法。

func (w *Producer) connect() error {
  ...
  w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
  w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
​
  _, err := w.conn.Connect()
  atomic.StoreInt32(&w.state, StateConnected)
  w.closeChan = make(chan int)
  w.wg.Add(1)
  go w.router()
  return nil
}

NewConn傳入配置文件,初始化Conn結構體。調用w.conn.Connect() 連接。我們先簡單看下 w.conn.Connect()方法:

func (c *Conn) Connect() (*IdentifyResponse, error) {
  dialer := &net.Dialer{
    LocalAddr: c.config.LocalAddr,
    Timeout:   c.config.DialTimeout,
  }
  //打開tcp端口
  conn, err := dialer.Dial("tcp", c.addr)
  c.conn = conn.(*net.TCPConn)
  c.r = conn
  c.w = conn
  //發送[]byte("  V2")
  _, err = c.Write(MagicV2)
  //身份校驗
  resp, err := c.identify()
  if err != nil {
    return nil, err
  }
​
  if resp != nil && resp.AuthRequired {
    if c.config.AuthSecret == "" {
      c.log(LogLevelError, "Auth Required")
      return nil, errors.New("Auth Required")
    }
    err := c.auth(c.config.AuthSecret)
    if err != nil {
      c.log(LogLevelError, "Auth Failed %s", err)
      return nil, err
    }
  }
​
  c.wg.Add(2)
  atomic.StoreInt32(&c.readLoopRunning, 1)
  go c.readLoop()
  go c.writeLoop()
  return resp, nil
}

該方法連接到nsq,並進行身份校驗。核心是最后幾行代碼。開了2個協程。readLoop() 和 writeLoop() 用來接收消息和寫入消息。

這里先停頓下,我們先跳回去繼續看producer,當它連接上nsq之后,會開個 go w.router() 協程,我們看下內部實現:

func (w *Producer) router() {
  for {
    select {
    case t := <-w.transactionChan:
      w.transactions = append(w.transactions, t)
      err := w.conn.WriteCommand(t.cmd)
      if err != nil {
        w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
        w.close()
      }
    case data := <-w.responseChan:
      w.popTransaction(FrameTypeResponse, data)
    case data := <-w.errorChan:
      w.popTransaction(FrameTypeError, data)
    case <-w.closeChan:
      goto exit
    case <-w.exitChan:
      goto exit
    }
  }
​
exit:
  w.transactionCleanup()
  w.wg.Done()
  w.log(LogLevelInfo, "exiting router")
}

在這個方法里就是監聽多個chan,分別是:是否有需要發送的消息,是否有收到的響應,是否有錯誤,是否有退出消息。剛才我們包裝的 transactionChan 就是通過這里發出去了。到這里整個發送流程就已經全部完成了。那么我們怎么知道到底是發送成功還是失敗了呢。這個時候就回到了剛才我們看到的在connect的時候開的2個協程readLoop() 和 writeLoop() 了。在這之前我們先了解下nsq的三種消息類型:

frame typesconst (  
  FrameTypeResponse int32 = 0   //響應  
  FrameTypeError    int32 = 1   //錯誤   
  FrameTypeMessage  int32 = 2   //消息
)

然后我們看下readLoop

func (c *Conn) readLoop() {
  delegate := &connMessageDelegate{c}
  for { 
  ...
    frameType, data, err := ReadUnpackedResponse(c)
  ...
    switch frameType {
    case FrameTypeResponse:
      c.delegate.OnResponse(c, data)
    case FrameTypeMessage:
      msg, err := DecodeMessage(data)
      if err != nil {
        c.log(LogLevelError, "IO error - %s", err)
        c.delegate.OnIOError(c, err)
        goto exit
      }
      msg.Delegate = delegate
      msg.NSQDAddress = c.String()
​
      atomic.AddInt64(&c.messagesInFlight, 1)
      atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
​
      c.delegate.OnMessage(c, msg)
    case FrameTypeError:
      c.log(LogLevelError, "protocol error - %s", data)
      c.delegate.OnError(c, data)
    default:
      c.log(LogLevelError, "IO error - %s", err)
      c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
    }
  }
​
exit:
...
}

readLoop 核心的代碼接收到消息后判斷消息類型,如果是響應(FrameTypeResponse)。調用 c.delegate.OnResponse(c, data) ,如果是消息(FrameTypeMessage),那么就增加messageInFlight 數,並且更新lastMsgTimestamp 時間,最后調用 c.delegate.OnMessage(c, msg)。 但是 c.delegate 這個接口是哪里實現的呢。我們可以看到 producer 在connect的時候傳入過:

w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})

所以最終就是調用producerConnDelegate結構體的 OnResponse 和OnMessage。 轉到producerConnDelegate結構體的方法非常簡單:

func (d *producerConnDelegate) OnResponse(c *Conn, data []byte)       { d.w.onConnResponse(c, data) }
func (d *producerConnDelegate) OnError(c *Conn, data []byte)          { d.w.onConnError(c, data) }
func (d *producerConnDelegate) OnMessage(c *Conn, m *Message)         {}

它只處理了OnResponse 和 onError, 針對OnMessage不做任何處理。OnResponse也很簡單。d.w.onConnResponse(c, data) 我們轉到 onConnResponse 發現也是只有一句代碼:

func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }

這里就和之前的 producer的router方法對應上了。router接受 responseChan或者 errorChan 執行 popTransaction 方法。

func (w *Producer) popTransaction(frameType int32, data []byte) {
  t := w.transactions[0]
  w.transactions = w.transactions[1:]
  if frameType == FrameTypeError {
    t.Error = ErrProtocol{string(data)}
  }
  t.finish()
}

首先獲取第一個transactions中的元素,如果是錯誤的響應,那么給他的Error上設置錯誤信息,最后調用finish方法

func (t *ProducerTransaction) finish() {
  if t.doneChan != nil {
    t.doneChan <- t
  }
}

這時候發送的就我們剛才創建的doneChan中傳入發送結果,那么用戶就可以通過doneChan知道消息是否發送成功了。最后我畫了一幅簡單的圖大家可以參考下:

好了今天到這里我們整體過了一遍消息創建邏輯的第一篇,下一篇是接着講消息到nsq之后的后續消息分發邏輯。我們下次見!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM