消費者消費消息源碼剖析
func (c *ConsumerHandler) HandlerMsg() {
conf := nsq.NewConfig()
consumer, err := nsq.NewConsumer(topic, "ch", conf)
if err != nil {
logs.Error("create consumer failed, err: %+v\n", err)
return
}
//添加消息處理函數
handler := &MsgHandler{}
consumer.AddHandler(handler)
err = consumer.ConnectToNSQLookupd(lookupdAddr)
if err != nil {
logs.Error("consumer connect nsq failed, err: %+v\n", err)
return
}
}
- 在聲明一個消費者的時候,直接調用 nsq的NewConsumer方法,第一個參數是 topic,第二個參數是channel,第三個參數是consumer的默認配置。創建好之后向consumer中添加我們自定義的一個handler,它是實現了Handler接口的HandleMessage。最后連接nsqlookupd。
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
config.assertInitialized()
if err := config.Validate(); err != nil {
return nil, err
}
if !IsValidTopicName(topic) {
return nil, errors.New("invalid topic name")
}
if !IsValidChannelName(channel) {
return nil, errors.New("invalid channel name")
}
r := &Consumer{
id: atomic.AddInt64(&instCount, 1),
topic: topic,
channel: channel,
config: *config,
logger: log.New(os.Stderr, "", log.Flags()),
logLvl: LogLevelInfo,
maxInFlight: int32(config.MaxInFlight),
incomingMessages: make(chan *Message),
rdyRetryTimers: make(map[string]*time.Timer),
pendingConnections: make(map[string]*Conn),
connections: make(map[string]*Conn),
lookupdRecheckChan: make(chan int, 1),
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
StopChan: make(chan int),
exitChan: make(chan int),
}
r.wg.Add(1)
go r.rdyLoop()
return r, nil
}
- 在創建consumer的過程中,首先對傳入的參數進行驗證,然后初始化consumer結構體里面字段的值,最后啟動了一個 goroutine定時更新RDY的值,它是用來控制服務端向客戶端推送的消息的數量的。
//關於 nsqd 流控相關的內容,后面會專題進行剖析,此處不再進行分析
go r.rdyLoop()
- nsq采用push的方式進行消息推送,無論客戶端是否繁忙,服務端都會推送消息,如果沒有一個流控機制,很容易讓客戶端最終因為消費速度跟不上導致各種性能問題。nsq於是才有了一個RDY的狀態字段來表示流控。簡單來說,就是客戶端連接上 nsqd服務之后,會告訴nsqd它的可接受消息數量是多少,每當nsqd給客戶端推送一條消息,這個RDY就會減一,而客戶端消費完一個消息,發送完FIN之后,這個RDY又會加一(有點類似於TCP中用來控制流量的窗口機制),當然,在連接之后,會啟動一個單獨的 goroutine在后台不斷去調整這個 rdycount。
// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
- 在向consumer中添加handler的時候,又調用了AddConcurrentHandlers方法,看名字應該是並發執行的handler的數量,這里默認傳入的是1。從注釋可以看到處理消息的goroutine 和接收消息的goroutine是一對一的。
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.connectedFlag) == 1 {
panic("already connected")
}
atomic.AddInt32(&r.runningHandlers, int32(concurrency))
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
- concurrency 就是用來說明我們現在傳入的handler要並發執行多少個,首先對正在運行的handler進行計數,然后根據並發量啟動handler開始工作,啟動的每一個handler也都是一個goroutine。
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
- 在執行handler的這個函數中是一個死循環,每次都會阻塞從consumer的incomingMessages中讀取消息,然后判斷消息是否失效,沒有失效才繼續用我們傳入的handler對消息進行處理。
func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
// message passed the max number of attempts
if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
message.ID, message.Attempts)
logger, ok := handler.(FailedMessageLogger)
if ok {
logger.LogFailedMessage(message)
}
return true
}
return false
}
- 當一個消息的重試次數達到最大重試次數,依舊沒有成功時,則認為該消息已經失效。
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
// producers for the configured topic.
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers")
}
if err := validatedLookupAddr(addr); err != nil {
return err
}
atomic.StoreInt32(&r.connectedFlag, 1)
r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
r.mtx.Unlock()
return nil
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()
// if this is the first one, kick off the go loop
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
- 首先判斷consumer是否停止,是否有handler在工作,將consumer標記為連接狀態,接着遍歷lookupdHTTPAddrs,如果當前連接的地址沒有,則添加,如果lookupdHTTPAddrs長度是1,說明這是第一次連接 lookupd,還沒有啟動過lookupdLoop,那么執行queryLookupd,最后啟動一個goroutine。
// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
retries := 0
retry:
endpoint := r.nextLookupdEndpoint()
r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
var data lookupResp
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
if retries < 3 {
r.log(LogLevelInfo, "retrying with next nsqlookupd")
goto retry
}
return
}
var nsqdAddrs []string
for _, producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs, joined)
}
// apply filter
if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
}
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
}
}
- 首先找到一個向nsqlookupd發送的http的鏈接,調用apiRequestNegotiateV1發送,nsqlookupd會向消費者返回存在用戶想消費的topic的所有nsqd的地址,接下來的工作就是遍歷nsqlookupd返回的消息組裝成nsqdAdder添加到nsqdAddrs中,並對它進行過濾,最后和過濾得到的所有nsqd建立連接。
// return the next lookupd endpoint to query
// keeping track of which one was last used
func (r *Consumer) nextLookupdEndpoint() string {
r.mtx.RLock()
if r.lookupdQueryIndex >= len(r.lookupdHTTPAddrs) {
r.lookupdQueryIndex = 0
}
addr := r.lookupdHTTPAddrs[r.lookupdQueryIndex]
num := len(r.lookupdHTTPAddrs)
r.mtx.RUnlock()
r.lookupdQueryIndex = (r.lookupdQueryIndex + 1) % num
urlString := addr
if !strings.Contains(urlString, "://") {
urlString = "http://" + addr
}
u, err := url.Parse(urlString)
if err != nil {
panic(err)
}
if u.Path == "/" || u.Path == "" {
u.Path = "/lookup"
}
v, err := url.ParseQuery(u.RawQuery)
v.Add("topic", r.topic)
u.RawQuery = v.Encode()
return u.String()
}
- 在獲取向nsqlookupd發送的http鏈接的時候,nsqlookupd可能有多個實例構成了集群,在消費者這邊會通過輪詢的方式選擇向哪一台nsqlookupd發送,具體是通過消費者中的lookupdQueryIndex參數。
// ConnectToNSQD takes a nsqd address to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically. This method is useful when you want to connect to a single, local,
// instance.
func (r *Consumer) ConnectToNSQD(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers")
}
atomic.StoreInt32(&r.connectedFlag, 1)
logger, logLvl := r.getLogger()
conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
conn.SetLogger(logger, logLvl,
fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))
r.mtx.Lock()
_, pendingOk := r.pendingConnections[addr]
_, ok := r.connections[addr]
if ok || pendingOk {
r.mtx.Unlock()
return ErrAlreadyConnected
}
r.pendingConnections[addr] = conn
if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
}
r.mtx.Unlock()
r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
cleanupConnection := func() {
r.mtx.Lock()
delete(r.pendingConnections, addr)
r.mtx.Unlock()
conn.Close()
}
resp, err := conn.Connect()
if err != nil {
cleanupConnection()
return err
}
if resp != nil {
if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
r.log(LogLevelWarning,
"(%s) max RDY count %d < consumer max in flight %d, truncation possible",
conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
}
}
cmd := Subscribe(r.topic, r.channel)
err = conn.WriteCommand(cmd)
if err != nil {
cleanupConnection()
return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
conn, r.topic, r.channel, err.Error())
}
r.mtx.Lock()
delete(r.pendingConnections, addr)
r.connections[addr] = conn
r.mtx.Unlock()
// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
}
return nil
}
- 還是先對參數進行驗證,之后根據我們傳入的鏈接,配置和封裝了consumer的consumerConnDelegate創建了conn,這個代理類的作用是非常大的,在最后我們會仔細看一下。接下來並沒有馬上建立連接,先從pendingConnections和connections中嘗試獲取addr對應的conn,如果獲取到了,說明建立過連接了,直接返回,否則先添加到pendingConnections中,創建了一個匿名函數cleanupConnection,當連接建立失敗后進行清理工作,之后才正式建立連接。如果建立成功建立一個訂閱命令,通過conn向當前的nsqd發送過去,更新pendingConnections和connections,最后檢查當前consumer的所有conn是否有必要更新RDY的值。
func (r *Consumer) maybeUpdateRDY(conn *Conn) {
inBackoff := r.inBackoff()
inBackoffTimeout := r.inBackoffTimeout()
if inBackoff || inBackoffTimeout {
r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
conn, inBackoff, inBackoffTimeout)
return
}
count := r.perConnMaxInFlight()
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
}
- 更新RDY的值,主要是根據當前consumer最大能接收的消息的數目發送給nsqd的。
// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
r.rngMtx.Lock()
jitter := time.Duration(int64(r.rng.Float64() *
r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
r.rngMtx.Unlock()
var ticker *time.Ticker
select {
case <-time.After(jitter):
case <-r.exitChan:
goto exit
}
ticker = time.NewTicker(r.config.LookupdPollInterval)
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
exit:
if ticker != nil {
ticker.Stop()
}
r.log(LogLevelInfo, "exiting lookupdLoop")
r.wg.Done()
}
- 在ConnectToNSQLookupd的最后一步就是啟動一個goroutine,在這里面會定時向nsqlookupd發送http請求更新和nsqd的連接,當有新的nsqd負責topic的存儲的時候可以馬上向這個nsqd獲取消息。
- consumer的啟動流程走完了,可是我們沒有看到consumer是如何獲取消息的呢,我開始再看的時候也沒有找到,但是,還記不記得我們剛剛在創建conn的時候傳入的是consumer的委托,沒錯,那個地方就是關鍵所在,我們先來看一下consumer的委托:
// keeps the exported Consumer struct clean of the exported methods
// required to implement the ConnDelegate interface
type consumerConnDelegate struct {
r *Consumer
}
func (d *consumerConnDelegate) OnResponse(c *Conn, data []byte) { d.r.onConnResponse(c, data) }
func (d *consumerConnDelegate) OnError(c *Conn, data []byte) { d.r.onConnError(c, data) }
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onConnMessage(c, m) }
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
func (d *consumerConnDelegate) OnClose(c *Conn) { d.r.onConnClose(c) }
- consumerConnDelegate 中只有一個參數就是consumer,但是它的方法我們一看就能知道是什么意思,並且知道它們都是什么時候執行的,先放一下,看看conn的創建:
// NewConn returns a new Conn instance
func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
if !config.initialized {
panic("Config must be created with NewConfig()")
}
return &Conn{
addr: addr,
config: config,
delegate: delegate,
maxRdyCount: 2500,
lastMsgTimestamp: time.Now().UnixNano(),
cmdChan: make(chan *Command),
msgResponseChan: make(chan *msgResponse),
exitChan: make(chan int),
drainReady: make(chan int),
}
}
- 此處只是初始化了一個Conn結構體,將委托ConnDelegate傳入,於是繼續找:
// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
_, err = c.Write(MagicV2)
if err != nil {
c.Close()
return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
}
resp, err := c.identify()
if err != nil {
return nil, err
}
if resp != nil && resp.AuthRequired {
if c.config.AuthSecret == "" {
c.log(LogLevelError, "Auth Required")
return nil, errors.New("Auth Required")
}
err := c.auth(c.config.AuthSecret)
if err != nil {
c.log(LogLevelError, "Auth Failed %s", err)
return nil, err
}
}
c.wg.Add(2)
atomic.StoreInt32(&c.readLoopRunning, 1)
go c.readLoop()
go c.writeLoop()
return resp, nil
}
- 可以看到,在建立連接之后,啟動了兩個goroutine,一個用來讀,一個用來寫。
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
if atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
}
if !strings.Contains(err.Error(), "use of closed network connection") {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
goto exit
}
if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
c.log(LogLevelDebug, "heartbeat received")
c.delegate.OnHeartbeat(c)
err := c.WriteCommand(Nop())
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
continue
}
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg)
case FrameTypeError:
c.log(LogLevelError, "protocol error - %s", data)
c.delegate.OnError(c, data)
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
exit:
atomic.StoreInt32(&c.readLoopRunning, 0)
// start the connection close
messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
if messagesInFlight == 0 {
// if we exited readLoop with no messages in flight
// we need to explicitly trigger the close because
// writeLoop won't
c.close()
} else {
c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
}
c.wg.Done()
c.log(LogLevelInfo, "readLoop exiting")
}
- 首先也是獲取了conn的委托,和consumer的一樣為它添加了一些相關事件的處理方法,接下來在ReadUnpackedResponse方法中從conn中不斷讀取Response,根據Response的類型,將Response的內容傳給consumer的相關方法,我們就來看看當接收到訂閱的消息后的工作:
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
}
- 就是向handler阻塞的通道里面寫數據。看到這我們發現消費者消費的消息是nsqd主動推送過來的,那么服務端是怎么知道的呢,其實在和nsqd建立完連接的時候向它發送了一個訂閱的命令。
cmd := Subscribe(r.topic, r.channel)
err = conn.WriteCommand(cmd)
- nsqd就是從這個命令中得知當前消費者要訂閱的消息,之后根據消費者更新過來的RDY的值來確定推送的數量。