Kafka入門(3):Sarama生產者是如何工作的


摘要

在這一篇的文章中,我將從Sarama的同步生產者和異步生產者怎么創建開始講起,然后我將向你介紹生產者中的各個參數是什么,怎么使用。

然后我將從創建生產者的代碼開始,按照代碼的調用流程慢慢深入,直到發送消息並接收到響應。

這個過程跟上面的文章說到的kafka各個層次其實是有對應關系的。

1.如何使用

1.1 介紹

在學習如何使用Sarama生產消息之前,我先稍微介紹一下。

Sarama有兩種類型的生產者,同步生產者和異步生產者。

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of Producer.RequiredAcks. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

官方文檔的大致意思是異步生產者使用channel接收(生產成功或失敗)的消息,並且也通過channel來發送消息,這樣做通常是性能最高的。而同步生產者需要阻塞,直到收到了acks。但是這也帶來了兩個問題,一是性能變得更差了,而是可靠性是依靠參數acks來保證的。

1.2 異步發送

然后我們直接來看看Sarama是怎么發送異步消息的。

我們先來創建一個最簡陋的異步生產者,省略所有的不必要的配置。

注意,為了更容易閱讀,我刪去了錯誤處理,並且用省略號替代。

func main() {
	config := sarama.NewConfig()
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	...

	producer, err := sarama.NewAsyncProducerFromClient(client)
	...
	defer producer.Close()

	topic := "topic-test"

	for i := 0; i <= 100; i++ {
		text := fmt.Sprintf("message %08d", i)
		producer.Input() <- &sarama.ProducerMessage{
			Topic: topic,
			Key:   nil,
			Value: sarama.StringEncoder(text)}
	}
}

可以看出,Sarama發送消息的套路就是先創建一個config,這里更多的config內容我們會在后文提到。

隨后根據這個config,和broker地址,創建出生產者客戶端。

再然后根據客戶端來創建生產者對象(其實在這里用對象不夠嚴謹,但是我認為這么理解是沒有問題的)。

最后就可以使用這個生產者對象來發送信息了。

消息的構造過程中我也省略了其他的參數,只保留了最重要也是最必須的兩個參數:主題和消息內容。

到了這里,一個簡單的異步生產者發送消息的過程就結束了。

1.3 同步發送

在看完了異步發送之后,你可能會有很多的諸如“為什么要這么做”的疑問。

我們先來看看同步發送,再來對比一下:

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	...

	producer, err := sarama.NewSyncProducerFromClient(client)
	...
	defer producer.Close()

	topic := "topic-test"

	for i := 0; i <= 10; i++ {
		text := fmt.Sprintf("message %08d", i)
		partition, offset, err := producer.SendMessage(
			&sarama.ProducerMessage{
				Topic: topic,
				Key:   nil,
				Value: sarama.StringEncoder(text)})
		...
		log.Println("send message success, partition = ", partition, " offset = ", offset)
	}
}

可以看出同步發送跟異步發送的過程是很相似的。

不同的地方在於,同步生產者發送消息,使用的不是channel,並且SendMessage方法有三個返回的值,分別為這條消息的被發送到了哪個partition,處於哪個offset,是否有error

也就是說,只有在消息成功的發送並寫入了broker,才會有返回值。

2. 配置

2.1 默認配置

我們順着源碼看一下這一行:

config := sarama.NewConfig()

可以看到Sarama已經返回了一個默認的config了:

// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
	c := &Config{}

  c.Producer.MaxMessageBytes = 1000000
	c.Producer.RequiredAcks = WaitForLocal
	c.Producer.Timeout = 10 * time.Second
	...
}

2.2 可選配置

我們來看看Config這個結構體,里面有哪些配置項是允許用戶自定義的。

因為實在是太長了,限於篇幅以及作者的學識,在這篇文章中不能一一講解,所以在這篇文章只會選取部分生產者相關的配置進行講解。

但是無論是Golang客戶端,還是Java客戶端,都不重要,你只需要知道哪些參數對於你的生產者的生產速度、消息的可靠性等有關系就可以了。

// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
	Admin struct {
		...
	}
	
	Net struct {
		...
	}
	
	Metadata struct {
		...
	}
	
	Producer struct {
		...
	}
	
	Consumer struct {
		...
	}
	
	ClientID string
	
	...
}

我們可以看出,關於Sarama的配置,分成了很多個部分,我們來具體看一看Producer的這部分。

2.3 重要的生產者參數

在這里我打算介紹一部分我個人認為比較重要的生產者參數。

  • MaxMessageBytes int
    

這個參數影響了一條消息的最大字節數,默認是1000000。但是注意,這個參數必須要小於broker中的 message.max.bytes

  • RequiredAcks RequiredAcks
    

這個參數影響了消息需要被多少broker寫入之后才返回。取值可以是0、1、-1,分別代表了不需要等待broker確認才返回、需要分區的leader確認后才返回、以及需要分區的所有副本確認后返回。

  • Partitioner PartitionerConstructor
    

這個是分區器。Sarama默認提供了幾種分區器,如果不指定默認使用Hash分區器。

  • Retry
    

這個參數代表了重試的次數,以及重試的時間,主要發生在一些可重試的錯誤中。

  • Flush
    

用於設置將消息打包發送,簡單來講就是每次發送消息到broker的時候,不是生產一條消息就發送一條消息,而是等消息累積到一定的程度了,再打包發送。所以里面含有兩個參數。一個是多少條消息觸發打包發送,一個是累計的消息大小到了多少,然后發送。

2.4 冪等生產者

在聊冪等生產者之前,我們先來看看生產者中另外一個很重要的參數:

  • MaxOpenRequests int
    

這個參數代表了允許沒有收到acks而可以同時發送的最大batch數。

  • Idempotent bool
    

用於冪等生產者,當這一項設置為true的時候,生產者將保證生產的消息一定是有序且精確一次的。

為什么會需要這個選項呢?

當MaxOpenRequests這個參數配置大於1的時候,代表了允許有多個請求發送了還沒有收到回應。假設此時的重試次數也設置為了大於1,當同時發送了2個請求,如果第一個請求發送到broker中,broker寫入失敗了,但是第二個請求寫入成功了,那么客戶端將重新發送第一個消息的請求,這個時候會造成亂序。

又比如當第一個請求返回acks的時候,因為網絡原因,客戶端沒有收到,所以客戶端進行了重發,這個時候就會造成消息的重復。

所以,冪等生產者就是為了保證消息發送到broker中是有序且不重復的。

消息的有序可以通過MaxOpenRequests設置為1來保證,這個時候每個消息必須收到了acks才能發送下一條,所以一定是有序的,但是不能夠保證不重復。

而且當MaxOpenRequests設置為1的時候,吞吐量不高。

注意,當啟動冪等生產者的時候,Retry次數必須要大於0,ack必須為all。

在Java客戶端中,允許MaxOpenRequests小於等於5。

但是在Sarama中有一個很奇怪的地方我也沒有研究明白,我們直接看一看這部分的代碼:

if c.Producer.Idempotent {
		if !c.Version.IsAtLeast(V0_11_0_0) {
			return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
		}
		if c.Producer.Retry.Max == 0 {
			return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
		}
		if c.Producer.RequiredAcks != WaitForAll {
			return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
		}
		if c.Net.MaxOpenRequests > 1 {
			return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
		}
	}

這一部分第一項是版本號,沒問題,第二第三項是RetryAcks,也沒有問題。問題在於第四項,這里的MaxOpenRequests參數,我想應該等同於Java客戶端中的MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,按照Java客戶端中的配置,應該是這個參數小於等於5,即可保證冪等,但是這里必須得設置為1。

檢查了Sarama的Issue,有開發者提出了這個問題,但是目前作者還沒有打算解決。

3 broker

在這一節的內容中,我將會從代碼的層面介紹 Sarama 生產者發送消息的全過程。但是因為代碼很多,我將會省略一些內容,包括一些錯誤處理、重試等。

這些都很重要,也不應該被省略。但是因為篇幅有限,我只能介紹最核心的發送消息這一部分的內容。

我會在貼代碼之前,大概的說一下這段代碼的思路。隨后,我會在代碼中加入一些注釋,來更詳細的進行解釋。

然后我們開始吧!

producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)

一切都從這么一行開始講起。

我們進去看看。

在這里其實就只有兩個部分,先是通過地址配置,構建一個 client

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  
  // 構建client
	client, err := NewClient(addrs, conf)
	if err != nil {
		return nil, err
	}
  
  // 構建AsyncProducer
	return newAsyncProducer(client)
}

3.1 Client的創建

在創建 Client 的過程中,先構建一個 client 結構體。

里面的參數我們先不管,等用到了再進行解釋。

然后創建完之后,刷新元數據,並且啟動一個協程,在后台進行刷新。

func NewClient(addrs []string, conf *Config) (Client, error) {
	...
  // 構建一個client
  client := &client{
		conf:                    conf,
		closer:                  make(chan none),
		closed:                  make(chan none),
		brokers:                 make(map[int32]*Broker),
		metadata:                make(map[string]map[int32]*PartitionMetadata),
		metadataTopics:          make(map[string]none),
		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
		coordinators:            make(map[string]int32),
	}
  // 把用戶輸入的broker地址作為“種子broker”增加到seedBrokers中
  // 隨后客戶端會根據已有的broker地址,自動刷新元數據,以獲取更多的broker地址
  // 所以稱之為種子
  random := rand.New(rand.NewSource(time.Now().UnixNano()))
	for _, index := range random.Perm(len(addrs)) {
		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
	}
	...
  // 啟動協程在后台刷新元數據
  go withRecover(client.backgroundMetadataUpdater)
  return client, nil
}
  

3.2 元數據的更新

后台更新元數據的設計其實很簡單,利用一個 ticker ,按時對元數據進行更新,直到 client 關閉。

這里先提一下我們說的元數據,有哪些內容。

你可以簡單的理解為包含了所有 broker 的地址(因為 broker 可能新增,也可能減少),以及包含了哪些 topic ,這些 topic 有哪些 partition 等。

func (client *client) backgroundMetadataUpdater() {
  
  // 按照配置的時間更新元數據
  ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
	defer ticker.Stop()
  
  // 循環獲取channel,判斷是執行更新操作還是終止
  for {
		select {
		case <-ticker.C:
			if err := client.refreshMetadata(); err != nil {
				Logger.Println("Client background metadata update:", err)
			}
		case <-client.closer:
			return
		}
	}
}

然后我們繼續來看看 client.refreshMetadata() 這個方法,這個方法是判斷了一下需要刷新哪些主題的元數據,還是說全部主題的元數據。

然后我們繼續。

在這里也還沒有涉及到具體的更新操作。我們看 tryRefreshMetadata 這個方法的參數可以得知,在這里我們設置了需要刷新元數據的主題,重試的次數,超時的時間。

func (client *client) RefreshMetadata(topics ...string) error {
  deadline := time.Time{}
	if client.conf.Metadata.Timeout > 0 {
		deadline = time.Now().Add(client.conf.Metadata.Timeout)
	}
  // 設置參數
	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}

然后終於來到了tryRefreshMetadata這個方法。

在這個方法中,會選取已經存在的broker,構造獲取元數據的請求。

在收到回應后,如果不存在任何的錯誤,就將這些元數據用於更新客戶端。

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
  ...
  
  broker := client.any()
	for ; broker != nil && !pastDeadline(0); broker = client.any() {
    ...
    		req := &MetadataRequest{
          Topics: topics, 
          // 是否允許創建不存在的主題
          AllowAutoTopicCreation: allowAutoTopicCreation
        }
    response, err := broker.GetMetadata(req)
    switch err.(type) {
		case nil:
			allKnownMetaData := len(topics) == 0
      // 對元數據進行更新
			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
			if shouldRetry {
				Logger.Println("client/metadata found some partitions to be leaderless")
				return retry(err)
			}
			return err
		case ...
      ...
    }
  }

然后我們繼續往下看看當客戶端拿到了 response 之后,是如何更新的。

首先,先對本地保存 broker 進行更新。

然后,對 topic 進行更新,以及這個 topic 下面的那些 partition

func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
  ...
  // 假設返回了新的broker id,那么保存這些新的broker,這意味着增加了broker、或者下線的broker重新上線了
  // 如果返回的id我們已經保存了,但是地址變化了,那么更新地址
  // 如果本地保存的一些id沒有返回,說明這些broker下線了,那么刪除他們
  client.updateBroker(data.Brokers)
  
  // 然后對topic也進行元數據的更新
  // 主要是更新topic以及topic對應的partition
  for _, topic := range data.Topics {
    ...
    // 更新每個topic以及對應的partition
    client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
		for _, partition := range topic.Partitions {
			client.metadata[topic.Name][partition.ID] = partition
			...
		}
  }

至此,我們元數據的更新就說完了。

下面我們來說一說在更新元數據之前,broker是如何建立連接的,以及請求是如何發送出去,又是如何被broker接收的。

3.3 與Broker建立連接

讓我們回到 tryRefreshMetadata 這個方法中。

這個方法里面有這么一行代碼:

broker := client.any()

我們進去看看。

在這個方法里, 如果 seedBrokers 存在,那么就打開它,否則的話打開其他的broker。

注意,這里提到的其他的broker,可能是在刷新元數據的時候,獲取到的。這就跟上面的內容聯系在一起了。

func (client *client) any() *Broker {
	...
	if len(client.seedBrokers) > 0 {
		_ = client.seedBrokers[0].Open(client.conf)
		return client.seedBrokers[0]
	}

	// 不保證一定是按順序的
	for _, broker := range client.brokers {
		_ = broker.Open(client.conf)
		return broker
	}

	return nil
}

然后再讓我們看看 Open方法做了什么。

Open方法異步的建立了一個tcp連接,然后創建了一個緩沖大小為MaxOpenRequestschannel

這個名為 responseschannel ,用於接收從 broker發送回來的消息。

其實在 broker 中,用於發送消息跟接收消息的 channel 都設置成了這個大小。

MaxOpenRequests 這個參數你可以理解為是Java客戶端中的max.in.flight.requests.per.connection

然后,又啟動了一個協程,用於接收消息。

func (b *Broker) Open(conf *Config) error {
  if conf == nil {
		conf = NewConfig()
	}
  ...
  go withRecover(func() {
    ...
    dialer := conf.getDialer()
		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
    
    ...
    b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
    ...
    go withRecover(b.responseReceiver)
  })

3.4 從Broker接收響應

我們來看看 responseReceiver 是怎么工作的。

其實很容易理解,當 broker 收到一個 response 的時候,先解析消息的頭部,然后再解析消息的內容。並把這些內容寫進 responsepackets 中。

func (b *Broker) responseReceiver() {
  for response := range b.responses {
    
    ...
    // 先根據Header的版本讀取對應長度的Header
    var headerLength = getHeaderLength(response.headerVersion)
		header := make([]byte, headerLength)
		bytesReadHeader, err := b.readFull(header)
    decodedHeader := responseHeader{}
		err = versionedDecode(header, &decodedHeader, response.headerVersion)
    
    ...
    // 解析具體的內容
    buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
		bytesReadBody, err := b.readFull(buf)
    
    // 省略了一些錯誤處理,總之,如果發生了錯誤,就把錯誤信息寫進 response.errors 中
    response.packets <- buf
  }
}

其實接收響應這部分的代碼邏輯很容易理解,就是當 response 這個 channel 有了消息,就讀取,然后將讀取到的內容寫進 response 中。

那么你可能會有一個問題,什么時候才會往response 這個 channel 發送消息呢?

很容易可以猜到,當我們發送了消息給 broker ,就應該要通知 receiver ,准備接受消息了。

既然如此,我們繼續剛剛刷新元數據的部分,看看 sarama 是如何把消息發送出去的。

3.5 發送與接受消息

我們回到這一行代碼:

response, err := broker.GetMetadata(req)

我們直接進去,發現在這里構造了一個接受返回信息的結構體,然后調用了sendAndReceive方法。

func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
	response := new(MetadataResponse)

	err := b.sendAndReceive(request, response)

	if err != nil {
		return nil, err
	}

	return response, nil
}

我們繼續往下。

在這里我們可以看到,先是調用了send方法,然后返回了一個promise。並且當有消息寫入這個promise的時候,就得到了結果。

而且回想一下我們在receiver中,是不是把獲取到的 response 寫進了 packets ,把錯誤結果寫進了 errors 呢,跟這里是一致的對吧?

func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
	responseHeaderVersion := int16(-1)
	if res != nil {
		responseHeaderVersion = res.headerVersion()
	}

	promise, err := b.send(req, res != nil, responseHeaderVersion)
	if err != nil {
		return err
	}

	if promise == nil {
		return nil
	}

  // 這里的promise,是上面send方法返回的
	select {
	case buf := <-promise.packets:
		return versionedDecode(buf, res, req.version())
	case err = <-promise.errors:
		return err
	}
}

帶着這個想法,我們看看 send 方法做了什么事。

這個地方很重要,也是我認為 Sarama 設計的特別巧妙的一個地方。

在send方法中,把需要發送的消息通過與broker的tcp連接,同步發送到broker中。

然后構建了一個responsePromise類型的channel,然后直接將這個結構體丟進這個channel中。然后回想一下,我們在responseReceiver這個方法中,不斷消費接收到的response。

此時在responseReceiver中,收到了send方法傳遞的responsePromise,他就會通過conn來讀取數據,然后將數據寫入這個responsePromise的packets中,或者將錯誤信息寫入errors中。

而此時,再看看send方法,他返回了這個responsePromise的指針。所以,sendAndReceive方法就在等待這個responsePromise內的packets或者errors的channel被寫入數據。當responseReceiver接收到了響應並且寫入數據的時候,packets或者errors就會被寫入消息。

func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
  
  ...
  // 將請求的內容封裝進 request ,然后發送到Broker中
  // 注意一下這里的 b.write(buf) 
  // 里面做了 b.conn.Write(buf) 這件事情
  req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
	buf, err := encode(req, b.conf.MetricRegistry)
  bytes, err := b.write(buf)
  
  ...
  // 如果我們的response為nil,也就是說當不需要response的時候,是不會放進inflight發送隊列的
  if !promiseResponse {
		// Record request latency without the response
		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
		return nil, nil
	}
  
  // 構建一個接收響應的 channel ,返回這個channel的指針
  // 這個 channel 內部包含了兩個 channel,一個用來接收響應,一個用來接收錯誤 
  promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
	b.responses <- promise

  // 這里返回指針特別的關鍵,是把消息的發送跟消息的接收聯系在一起了
	return &promise, nil
}

讓我們來用一張圖說明一下上面這個發送跟接收的過程:

這一段比較繞,但這也是Sarama發送與接受消息的核心內容,希望我的解釋能夠讓你理解:)

4 AsyncProcuder

在上一節中,我們已經分析了client的構造全過程,並且在構造client刷新元數據的時候,也解釋了sarama是如何發送消息以及接受消息的。

在這一節中,我打算解釋一下AsyncProcuder是如何發送消息的。

因為有了上一節的鋪墊,這一節的內容應該會比較容易理解。

我們從newAsyncProducer(client)這一行開始講起。

我們先說說input:make(chan *ProducerMessage),這個事關我們的消息發送。注意到這個channel是沒有緩沖的。

也就是說當我們發送一條消息到input中的時候,此時發送方會阻塞,這說明了之后的操作必須不能夠被阻塞,否則會影響消息的發送效率。

然后其他字段我們先不管,后面用到了我們再提。

func newAsyncProducer(client Client) (AsyncProducer, error) {
  ...
  p := &asyncProducer{
		client:     client,
		conf:       client.Config(),
		errors:     make(chan *ProducerError),
		input:      make(chan *ProducerMessage),
		successes:  make(chan *ProducerMessage),
		retries:    make(chan *ProducerMessage),
		brokers:    make(map[*Broker]*brokerProducer),
		brokerRefs: make(map[*brokerProducer]int),
		txnmgr:     txnmgr,
	}
  
  go withRecover(p.dispatcher)
	go withRecover(p.retryHandler)
}

4.1 dispatcher

我們往下看看下面協程啟動的go withRecover(p.dispatcher)

在這個方法中,首先創建了一個以Topic為key的map,這個map的value是無緩沖的channel。

到這里我們很容易可以推測得出,當通過input發送一條消息的時候,消息會到dispatcher這里,被分配到各個Topic中。

注意,在這個時候,channel還是無緩沖的,所以我們可以推測下一步的操作,依舊是無阻塞的。

func (p *asyncProducer) dispatcher() {
  handlers := make(map[string]chan<- *ProducerMessage)
  ...
  for msg := range p.input {
    ...
    // 攔截器
    for _, interceptor := range p.conf.Producer.Interceptors {
			msg.safelyApplyInterceptor(interceptor)
		}
    
    ...
    // 找到這個Topic對應的Handler
    handler := handlers[msg.Topic]
		if handler == nil {
      // 如果此時還不存在這個Topic對應的Handler,那么創建一個
      // 雖然說他叫Handler,但他其實是一個無緩沖的
			handler = p.newTopicProducer(msg.Topic)
			handlers[msg.Topic] = handler
		}
		// 然后把這條消息寫進這個Handler中
		handler <- msg
  }
}

然后讓我們來handler = p.newTopicProducer(msg.Topic)這一行的代碼。

在這里創建了一個緩沖大小為ChannelBufferSize的channel,用於存放發送到這個主題的消息。

然后創建了一個topicProducer,在這個時候你可以認為消息已經交付給各個topic的topicProducer了。

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	tp := &topicProducer{
		parent:      p,
		topic:       topic,
		input:       input,
		breaker:     breaker.New(3, 1, 10*time.Second),
		handlers:    make(map[int32]chan<- *ProducerMessage),
		partitioner: p.conf.Producer.Partitioner(topic),
	}
	go withRecover(tp.dispatch)
	return input
}

4.2 topicDispatch

然后我們來看看go withRecover(tp.dispatch)這一行代碼。

同樣是啟動了一個協程,來處理消息。

也就是說,到了這一步,對於每一個Topic,都有一個協程來處理消息。

在這個dispatch()方法中,也同樣的接收到一條消息,就會去找這條消息所在的分區的channel,然后把消息寫進去。

func (tp *topicProducer) dispatch() {
  for msg := range tp.input {
    ...
    
    // 同樣是找到這條消息所在的分區對應的channel,然后把消息丟進去
    handler := tp.handlers[msg.Partition]
		if handler == nil {
			handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
			tp.handlers[msg.Partition] = handler
		}

		handler <- msg
  }
}

4.3 PartitionDispatch

我們進tp.parent.newPartitionProducer(msg.Topic, msg.Partition)這里看看。

你可以發現partitionProducer跟topicProducer是很像的。

其實他們就是代表了一條消息的分發,從producer到topic到partition。

注意,這里面的channel緩沖大小,也是ChannelBufferSize。

func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	pp := &partitionProducer{
		parent:    p,
		topic:     topic,
		partition: partition,
		input:     input,

		breaker:    breaker.New(3, 1, 10*time.Second),
		retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
	}
	go withRecover(pp.dispatch)
	return input
}

4.4 partitionProducer

到了這一步,我們再來看看消息到了每個partition所在的channel,是如何處理的。

其實在這一步中,主要是做一些錯誤處理之類的,然后把消息丟進brokerProducer。

可以理解為這一步是業務邏輯層到網絡IO層的轉變,在這之前我們只關心消息去到了哪個分區,而在這之后,我們需要找到這個分區所在的broker的地址,並使用之前已經建立好的TCP連接,發送這條消息。

func (pp *partitionProducer) dispatch() {
  
  // 找到這個主題和分區的leader所在的broker
  pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  // 如果此時找到了這個leader
  if pp.leader != nil {
		pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
		pp.parent.inFlight.Add(1) 
    // 發送一條消息來表示同步
		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
	}
  ...// 各種異常情況
  
  // 然后把消息丟進brokerProducer中
  pp.brokerProducer.input <- msg
}

4.5 brokerProducer

到了這里,大概算是整個發送流程最后的一個步驟了。

我們來看看pp.parent.getBrokerProducer(pp.leader)這行代碼里面的內容。

其實就是找到asyncProducer中的brokerProducer,如果不存在,則創建一個。

func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
	p.brokerLock.Lock()
	defer p.brokerLock.Unlock()

	bp := p.brokers[broker]

	if bp == nil {
		bp = p.newBrokerProducer(broker)
		p.brokers[broker] = bp
		p.brokerRefs[bp] = 0
	}

	p.brokerRefs[bp]++

	return bp
}

那我們就來看看brokerProducer是怎么創建出來的。

看這個方法中啟動的第二個協程,我們可以推測bridge這個channel收到消息后,會把收到的消息打包成一個request,然后調用Produce方法。

並且,將返回的結果的指針地址,寫進response中。

然后構造好brokerProducerResponse,並且寫入responses中。

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
	var (
		input     = make(chan *ProducerMessage)
		bridge    = make(chan *produceSet)
		responses = make(chan *brokerProducerResponse)
	)

	bp := &brokerProducer{
		parent:         p,
		broker:         broker,
		input:          input,
		output:         bridge,
		responses:      responses,
		stopchan:       make(chan struct{}),
		buffer:         newProduceSet(p),
		currentRetries: make(map[string]map[int32]error),
	}
	go withRecover(bp.run)

	// minimal bridge to make the network response `select`able
	go withRecover(func() {
		for set := range bridge {
			request := set.buildRequest()

			response, err := broker.Produce(request)

			responses <- &brokerProducerResponse{
				set: set,
				err: err,
				res: response,
			}
		}
		close(responses)
	})

	if p.conf.Producer.Retry.Max <= 0 {
		bp.abandoned = make(chan struct{})
	}

	return bp
}

讓我們再來看看broker.Produce(request)這一行代碼。

是不是很熟悉呢,我們在client部分講到的sendAndReceive方法。

而且我們可以發現,如果我們設置了需要Acks,就會返回一個response;如果沒設置,那么消息發出去之后,就不管了。

此時在獲取了response,並且填入了response的內容后,返回這個response的內容。

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
	var (
		response *ProduceResponse
		err      error
	)

	if request.RequiredAcks == NoResponse {
		err = b.sendAndReceive(request, nil)
	} else {
		response = new(ProduceResponse)
		err = b.sendAndReceive(request, response)
	}

	if err != nil {
		return nil, err
	}

	return response, nil
}

至此,Sarama生產者相關的內容就介紹完畢了。

寫在最后

這一篇寫的實在是有些久了。

主要是作者這段時間實在是太忙了,還沒有完全平衡好目前的學習工作和生活,導致每天花在學習上的時間不多,效率也不高。

另外就是網上我也沒有查到有Sarama相關的解析,都是一些API的調用。因為作者恰好開始學習Kafka,為了更好地了解生產者的每一個參數,我選擇去研究生產者客戶端。

但是,因為作者源碼閱讀能力實在是有限,在這個過程中很有可能會有一些錯誤的理解。所以當你發現了一些違和的地方,也請不吝指教,謝謝你!

再次感謝你能看到這里!

PS:如果有其他的問題,也可以在公眾號找到我,歡迎來找我玩~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM