go筆記 NSQ (5) ( nsqd如何監聽生產者的消息,select關鍵字使用)


 

  本節主要來探究nsq如何監聽生產者的消息。

  通過上節我們得知nsq接收消息發送主要是靠下面這個http處理器   當然了也可以通過原生tcp的方式進行消息發送,由於具體處理流程類似,所以文末會有提到。

router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))

  我們發送一個http請求例如如下就可以向指定topic生產一個消息

$ curl -d "<message>" http://127.0.0.1:4151/pub?topic=name

  所以本文的內容主要看  s.doPUB是如何處理請求的。

 

1.接收請求生成消息結構體

  

func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
	//檢查消息是否過大
	if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize {
		return nil, http_api.Err{413, "MSG_TOO_BIG"}
	}

	//最大可閱讀孩值+1
	readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
	//獲得請求體
	body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
	if err != nil {
		return nil, http_api.Err{500, "INTERNAL_ERROR"}
	}
	if int64(len(body)) == readMax {
		return nil, http_api.Err{413, "MSG_TOO_BIG"}
	}
	if len(body) == 0 {
		return nil, http_api.Err{400, "MSG_EMPTY"}
	}
	//獲得對應的topic  以及消息內容,如果topic沒有會直接創建
	reqParams, topic, err := s.getTopicFromQuery(req)
	if err != nil {
		return nil, err
	}
	//這兒判斷消息是否是延時隊列  如果是的話獲得延時時間
	var deferred time.Duration
	if ds, ok := reqParams["defer"]; ok {
		var di int64
		di, err = strconv.ParseInt(ds[0], 10, 64)
		if err != nil {
			return nil, http_api.Err{400, "INVALID_DEFER"}
		}
		deferred = time.Duration(di) * time.Millisecond
		if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout {
			return nil, http_api.Err{400, "INVALID_DEFER"}
		}
	}
	//創建一個message結構體
	msg := NewMessage(topic.GenerateID(), body)
	msg.deferred = deferred
	//將消息發送給topic下的channel
	err = topic.PutMessage(msg)
	if err != nil {
		return nil, http_api.Err{503, "EXITING"}
	}

	return "OK", nil
}

   其實代碼主要分為幾個部分

   1.檢查消息是否滿足要求,例如長度等

   2.根據消息獲得對應topic(如果沒有會創建)

   3.生成消息結構體,將消息發送到指定topic下的所有channel 

 

    我們主要看下 2和3的具體操作

2.獲取topic

  思路也是比較常規的思路,獲取請求參數里指定的topic,然后去nsqd結構體下的topic map查看是否有對應的topic ,如果有則直接返回,如果沒有就創建一個新的topic存入這個topic map,然后返回。看具體getTopicFromQuery

代碼,

  

func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
	//如果請求參數
	reqParams, err := url.ParseQuery(req.URL.RawQuery)
	if err != nil {
		s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
		return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
	}
	//查看是否有topic參數 如果沒有直接返回錯誤信息
	topicNames, ok := reqParams["topic"]
	if !ok {
		return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
	}
	//獲取到topicName
	topicName := topicNames[0]
	//如果topicName非法也會報錯  比如長度大於1 小於64
	if !protocol.IsValidTopicName(topicName) {
		return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
	}
	//去nsqd中查詢topic
	return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
}

   這兒主要獲取到topicName,然后去nsqd中查找,再看nsqd.GetTopic方法。

// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
	//讀寫鎖,防止重復創建某個topic
	//加讀鎖讀,如果未讀到
	n.RLock()
	t, ok := n.topicMap[topicName]
	n.RUnlock()
	if ok {
		return t
	}
	//加寫鎖並在此檢查是否存在
	n.Lock()

	t, ok = n.topicMap[topicName]
	if ok {
		n.Unlock()
		return t
	}
	//定義個刪除topic后的回調函數
	deleteCallback := func(t *Topic) {
		n.DeleteExistingTopic(t.name)
	}
	//新建一個topic  
	t = NewTopic(topicName, &context{n}, deleteCallback)
	//放入上下文即nsqd的topicMap中
	n.topicMap[topicName] = t
	//解鎖
	n.Unlock()

	n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
	// topic is created but messagePump not yet started

	// 如果topic已經是loading狀態就直接返回
	if atomic.LoadInt32(&n.isLoading) == 1 {
		return t
	}

	//如果我們配置了lookupd  查看  lookupd在該topicName下是否有channel  如果有則獲取到其下面的channel名字新建到當前topic下 
	//這兒比如說已經有一個nsqd 關聯nsqlookupd topic為xxx ,下面有channelA   channelB   這個時候如果我們又啟動了一個nsqd  也是關聯這個nsqlookupd
	//並且也有生產者往這個nsqd發送topic為xxx的信息,為了保證集群的一致性,需要其下面也要有channelA   channelB
	lookupdHTTPAddrs := n.lookupdHTTPAddrs()
	if len(lookupdHTTPAddrs) > 0 {
		channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
		if err != nil {
			n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
		}
		for _, channelName := range channelNames {
			if strings.HasSuffix(channelName, "#ephemeral") {
				continue // do not create ephemeral channel with no consumer client
			}
			t.GetChannel(channelName)
		}
	} else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
		n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
	}

	// 所有channel都添加  可以開始讓topic接收消息
	t.Start()
	return t
}

  這兒其實就是一個新建topic的過程,這兒需要注意的地方也就是兩個,一個是新建topic結構體,二是去nsqlookupd同步該topic已有的channel。這兒我們主要看下新建topic結構體的操作,即這個NewTopic方法。

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
	t := &Topic{
		name:              topicName,//topicName
		channelMap:        make(map[string]*Channel), //該topic下包含的chanel,所有channel都將保存topic消息的副本
		memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),//消息將先到達->memoryMsgChan,然后會輪流推送到所有的channel
		startChan:         make(chan int, 1), //啟動信號
		exitChan:          make(chan int), //關閉信號
		channelUpdateChan: make(chan int),//包含的channel修改信號  例如添加或者刪除
		ctx:               ctx,//nsqd上下文
		paused:            0,
		pauseChan:         make(chan int),//暫停信號
		deleteCallback:    deleteCallback, //刪除回調函數
		idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),//id
	}
	//臨時topic
	if strings.HasSuffix(topicName, "#ephemeral") {
		t.ephemeral = true
		t.backend = newDummyBackendQueue()
	} else {
		dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
			opts := ctx.nsqd.getOpts()
			lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)
		}
		//backend是消息如果已經達到了topic容納消息的最長時的備份策略
		//這兒是存在硬盤中
		t.backend = diskqueue.New(
			topicName,
			ctx.nsqd.getOpts().DataPath,
			ctx.nsqd.getOpts().MaxBytesPerFile,
			int32(minValidMsgLength),
			int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
			ctx.nsqd.getOpts().SyncEvery,
			ctx.nsqd.getOpts().SyncTimeout,
			dqLogf,
		)
	}
	//topic准備開始接收消息
	t.waitGroup.Wrap(t.messagePump)
	//通知nsqlookupd該nsqd新建了一個topic
	t.ctx.nsqd.Notify(t)

	return t
}

    這段創建topic的代碼比較核心,主要有幾個地方需要注意。1是新建topic結構體   2是設置topic的backend  這關系到消息達到最大時的存儲策略  3是topic開始進入准備接收消息狀態

    關於topic結構體的一些核心變量有必要做一個說明

  • channelMap  用來存放該topic下的所有channel,當有消息推送到topic時,下面的所有channel都會收到信息
  • memoryMsgChan 消息推送chan,當有消息到來時會先到該chan,然后接收chan信息遍歷推送到每個channel
  • startChan 開始信號,有信息進入該chan說明topic可以開始接收消息推送
  • exitChan 接收信號,當該topic不需要在接收消息推送(例如被刪除),可以將信息設置到該chan
  • channelUpdateChan channel修改信號,例如該topic下有新創建channel,則可以將信息推入該chan,可以用來同步nsqlookupd等
  • ctx  即nsqd上下文
  • deleteCallback 即刪除回調函數

  新建topic后,會設置其backend即信息消息超出chan最大值時的備份方法,這兒一般存在硬盤上,這個后面會說到。  

  t.waitGroup.Wrap(t.messagePump) 則是接收處理消息的方法。t.ctx.nsqd.Notify(t) 內部最后主要是會通知nsqlookupd做一些同步信息。

  我們主要看下接收處理消息的方法。

 

3. topic消息處理

  

func (t *Topic) messagePump() {
	//消息體
	var msg *Message
	//消息buf
	var buf []byte
	//錯誤
	var err error
	//該topic下所有channel
	var chans []*Channel
	//該topic的memoryMsgChan  消息入口
	var memoryMsgChan chan *Message
	//超出最大消息隊列的備份chan
	var backendChan chan []byte

	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
	for {
		select {
		case <-t.channelUpdateChan:
			continue
		case <-t.pauseChan:
			continue
		case <-t.exitChan:
			goto exit
		//如果startChan准備好了才開始接收消息推送	
		case <-t.startChan:
		}
		break
	}
	//讀取該topic中所有的channel
	t.RLock()
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.RUnlock()
	if len(chans) > 0 && !t.IsPaused() {
		//賦值memoryMsgChan backendChan
		memoryMsgChan = t.memoryMsgChan
		backendChan = t.backend.ReadChan()
	}

	//核心消息輪詢處理
	for {
		select {
		//接收到消息推送	
		case msg = <-memoryMsgChan:
		//接收到備份消息推送  例如磁盤
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
		//如果channel有改動	  則重新獲取該topic下的channel
		case <-t.channelUpdateChan:
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			//如果暫停了就直接為空
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				//重新賦值memoryMsgChan 和backendChan
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		//如果是暫停信號 則和上面暫停時一個操作	
		case <-t.pauseChan:
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		//如果是exit信號說明該topic不在消費消息,直接goto到exit代碼塊	
		case <-t.exitChan:
			goto exit
		}
		//遍歷該topic下的channel   並且發送消息
		//注意如果為延時消息則會扔到延時隊列里邊去
		for i, channel := range chans {
			chanMsg := msg
			// copy the message because each channel
			// needs a unique instance but...
			// fastpath to avoid copy if its the first channel
			// (the topic already created the first copy)
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}

exit:
	t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

    這里面邏輯還是很簡單的,就先輪詢等待,直到接收到start信號,則開始處理消息信號。

  • 如果接收到正常的msg即消息信號或者backend信號即還未被消費的備份消息,則直接遍歷該topic下的所有channel並發送消息 (延時消息會放置到延時隊列)
  • 如果接收到channel改動信號比如新增或者刪除,則重新賦值該topic下的所有channel
  • 如果接收到暫停信號,且是暫停命令,則將實時消息和備份消息chan都置為null,如果不是則將上下文中的chan重新賦值到該方法中
  • 如果接收到exit信號,則退出消息接收輪詢,執行exit后的代碼塊

  其實這兒比較關注的點應該是兩個,何時會接收到memoryMsgChan的值,每個channel具體是怎么推送消息到其下面的所有client也就是consumer的。 我們先看第一個問題,怎么接收memoryMsgChan,要明白這個問題,我們要回到菜單1中,創建好topic后的操作,代碼中創建topic后會執行一個putMessage操作、

	msg := NewMessage(topic.GenerateID(), body)
	msg.deferred = deferred
	err = topic.PutMessage(msg)

 

4. 推送消息到topic的memoryMsgChan

  

func (t *Topic) PutMessage(m *Message) error {
	t.RLock()
	defer t.RUnlock()
	//檢查topic是否已經被停了
	if atomic.LoadInt32(&t.exitFlag) == 1 {
		return errors.New("exiting")
	}
	//發送消息
	err := t.put(m)
	if err != nil {
		return err
	}
	//已接收消息+1
	atomic.AddUint64(&t.messageCount, 1)
	//備份消息內容
	atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
	return nil
}

  這兒就是檢查了一下,核心還是這個put方法。

func (t *Topic) put(m *Message) error {
	select {
	//****************關鍵操作  消息放入到該topic的	memoryMsgChan中
	case t.memoryMsgChan <- m:
	//如果放入失敗說明chan已經滿了 此時需要放入磁盤
	default:
		b := bufferPoolGet()
		//信息寫入備份
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		//該nsqd處於不健康狀態
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

  可以看到 這兒就很明顯的看到會將消息寫入memoryMsgChan ,而另一邊topic中就會接收到這個消息並推送到其包含的所有channel。當然如果chan已經滿了,就會執行default操作,即執行備份操作,一般是寫入磁盤,關於備份磁盤有關的操作后面會專門講到,這兒就不先說明了。

 

5.topic如何將消息推送到其包含的所有channel

  通過三我們知道topic會遍歷其包含的所有channel,然后將消息推送到channel。我們可以看下具體的推送細節即channel.PutMessage方法

err := channel.PutMessage(chanMsg)

  

func (c *Channel) PutMessage(m *Message) error {
	c.RLock()
	defer c.RUnlock()
	if c.Exiting() {
		return errors.New("exiting")
	}
	err := c.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&c.messageCount, 1)
	return nil
}

  和topic的那兒的操作很像啊,都是做了一個狀態判斷。然后執行其put方法。

func (c *Channel) put(m *Message) error {
	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}

  這兒也和topic很像,會將消息推送到Channel中的memoryMsgChan   如果長度過大,那么就會放入到備份隊列中。我們發現channel的操作和topic非常像。那么也很容易可以得知,channel中所包含的消費者肯定也會輪詢接收這個memoryMsgChan 的信號。當然具體的channe有關的內容屬於消費者的范疇,本文主要講述生產者。

  此時我們已經成功接收到生產者推送的消息,並將消息分發到topic下所有的channel。用圖來表示的話大致可以如下

  

 

  到此生產者已經完成工作,在后面一章將會講到消費者是如何接收channel中的消息。

  官網中信息傳遞圖 nsqd的部分已經完成

 

6. select使用

  本文很多地方用到select,該關鍵字一般用來操作接收多個chan事件時分別做出對應的處理,比如我們可以用如下demo來了解

func main() {
	read :=bufio.NewReader(os.Stdin)
	ch1 := make(chan int ,1)
	ch2 := make(chan int ,1)
	ch3 := make(chan int ,1)

	go func() {

		for  {
			select {
			case <-ch1:
				fmt.Println("接收到指令1")
			case <-ch2:
				fmt.Println("接收到指令2")
			case <-ch3:
				fmt.Println("接收到指令3")
			default:

			}
		}
	}()

	for  {
		s,_ :=read.ReadString('\n')
		str :=strings.ReplaceAll(s,"\r\n","")
		switch str {
		case "1":
			ch1 <- 1
		case "2":
			ch2 <- 1
		case "3":
			ch3 <- 1
		default:
			fmt.Println("未知指令")
		}
	}
}

  

 

 

   其實就是可以監聽多個chan信號,監聽到其中某個可以執行對應的操作,一般和for結合使用。

 

后記

  nsq不光能通過http的方式發送消息,也支持原生tcp協議監聽端口監聽生產者消息。具體可以看到nsqd.main方法中創建的tcpServer。其接收到新的套接字后會執行protocolV2.IOLoop(clientConn)方法。在該方法中最終可以走到如下代碼

response, err = p.Exec(client, params)

  在該方法中

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	//接收生產者消息	
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
	case bytes.Equal(params[0], []byte("NOP")):
		return p.NOP(client, params)
	case bytes.Equal(params[0], []byte("TOUCH")):
		return p.TOUCH(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	case bytes.Equal(params[0], []byte("CLS")):
		return p.CLS(client, params)
	case bytes.Equal(params[0], []byte("AUTH")):
		return p.AUTH(client, params)
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

  我們可以看這個p.PUB方法就是接收生產者消息的方法。內部的處理操作和http方式的一致,這兒就不在說明。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM