摘要
在這一篇的文章中,我將從Sarama
的同步生產者和異步生產者怎么創建開始講起,然后我將向你介紹生產者中的各個參數是什么,怎么使用。
然后我將從創建生產者的代碼開始,按照代碼的調用流程慢慢深入,直到發送消息並接收到響應。
這個過程跟上面的文章說到的kafka各個層次其實是有對應關系的。
1.如何使用
1.1 介紹
在學習如何使用Sarama
生產消息之前,我先稍微介紹一下。
Sarama
有兩種類型的生產者,同步生產者和異步生產者。
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of
Producer.RequiredAcks
. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
官方文檔的大致意思是異步生產者使用channel
接收(生產成功或失敗)的消息,並且也通過channel
來發送消息,這樣做通常是性能最高的。而同步生產者需要阻塞,直到收到了acks
。但是這也帶來了兩個問題,一是性能變得更差了,而是可靠性是依靠參數acks
來保證的。
1.2 異步發送
然后我們直接來看看Sarama
是怎么發送異步消息的。
我們先來創建一個最簡陋的異步生產者,省略所有的不必要的配置。
注意,為了更容易閱讀,我刪去了錯誤處理,並且用省略號替代。
func main() {
config := sarama.NewConfig()
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
...
producer, err := sarama.NewAsyncProducerFromClient(client)
...
defer producer.Close()
topic := "topic-test"
for i := 0; i <= 100; i++ {
text := fmt.Sprintf("message %08d", i)
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.StringEncoder(text)}
}
}
可以看出,Sarama
發送消息的套路就是先創建一個config
,這里更多的config
內容我們會在后文提到。
隨后根據這個config
,和broker
地址,創建出生產者客戶端。
再然后根據客戶端來創建生產者對象(其實在這里用對象不夠嚴謹,但是我認為這么理解是沒有問題的)。
最后就可以使用這個生產者對象來發送信息了。
消息的構造過程中我也省略了其他的參數,只保留了最重要也是最必須的兩個參數:主題和消息內容。
到了這里,一個簡單的異步生產者發送消息的過程就結束了。
1.3 同步發送
在看完了異步發送之后,你可能會有很多的諸如“為什么要這么做”的疑問。
我們先來看看同步發送,再來對比一下:
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
...
producer, err := sarama.NewSyncProducerFromClient(client)
...
defer producer.Close()
topic := "topic-test"
for i := 0; i <= 10; i++ {
text := fmt.Sprintf("message %08d", i)
partition, offset, err := producer.SendMessage(
&sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.StringEncoder(text)})
...
log.Println("send message success, partition = ", partition, " offset = ", offset)
}
}
可以看出同步發送跟異步發送的過程是很相似的。
不同的地方在於,同步生產者發送消息,使用的不是channel
,並且SendMessage
方法有三個返回的值,分別為這條消息的被發送到了哪個partition
,處於哪個offset
,是否有error
。
也就是說,只有在消息成功的發送並寫入了broker
,才會有返回值。
2. 配置
2.1 默認配置
我們順着源碼看一下這一行:
config := sarama.NewConfig()
可以看到Sarama
已經返回了一個默認的config了:
// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
c := &Config{}
c.Producer.MaxMessageBytes = 1000000
c.Producer.RequiredAcks = WaitForLocal
c.Producer.Timeout = 10 * time.Second
...
}
2.2 可選配置
我們來看看Config
這個結構體,里面有哪些配置項是允許用戶自定義的。
因為實在是太長了,限於篇幅以及作者的學識,在這篇文章中不能一一講解,所以在這篇文章只會選取部分生產者相關的配置進行講解。
但是無論是Golang客戶端,還是Java客戶端,都不重要,你只需要知道哪些參數對於你的生產者的生產速度、消息的可靠性等有關系就可以了。
// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
Admin struct {
...
}
Net struct {
...
}
Metadata struct {
...
}
Producer struct {
...
}
Consumer struct {
...
}
ClientID string
...
}
我們可以看出,關於Sarama
的配置,分成了很多個部分,我們來具體看一看Producer
的這部分。
2.3 重要的生產者參數
在這里我打算介紹一部分我個人認為比較重要的生產者參數。
-
MaxMessageBytes int
這個參數影響了一條消息的最大字節數,默認是1000000。但是注意,這個參數必須要小於broker中的 message.max.bytes
。
-
RequiredAcks RequiredAcks
這個參數影響了消息需要被多少broker寫入之后才返回。取值可以是0、1、-1,分別代表了不需要等待broker確認才返回、需要分區的leader確認后才返回、以及需要分區的所有副本確認后返回。
-
Partitioner PartitionerConstructor
這個是分區器。Sarama
默認提供了幾種分區器,如果不指定默認使用Hash分區器。
-
Retry
這個參數代表了重試的次數,以及重試的時間,主要發生在一些可重試的錯誤中。
-
Flush
用於設置將消息打包發送,簡單來講就是每次發送消息到broker的時候,不是生產一條消息就發送一條消息,而是等消息累積到一定的程度了,再打包發送。所以里面含有兩個參數。一個是多少條消息觸發打包發送,一個是累計的消息大小到了多少,然后發送。
2.4 冪等生產者
在聊冪等生產者之前,我們先來看看生產者中另外一個很重要的參數:
-
MaxOpenRequests int
這個參數代表了允許沒有收到acks而可以同時發送的最大batch
數。
-
Idempotent bool
用於冪等生產者,當這一項設置為true
的時候,生產者將保證生產的消息一定是有序且精確一次的。
為什么會需要這個選項呢?
當MaxOpenRequests這個參數配置大於1的時候,代表了允許有多個請求發送了還沒有收到回應。假設此時的重試次數也設置為了大於1,當同時發送了2個請求,如果第一個請求發送到broker中,broker寫入失敗了,但是第二個請求寫入成功了,那么客戶端將重新發送第一個消息的請求,這個時候會造成亂序。
又比如當第一個請求返回acks的時候,因為網絡原因,客戶端沒有收到,所以客戶端進行了重發,這個時候就會造成消息的重復。
所以,冪等生產者就是為了保證消息發送到broker中是有序且不重復的。
消息的有序可以通過MaxOpenRequests設置為1來保證,這個時候每個消息必須收到了acks才能發送下一條,所以一定是有序的,但是不能夠保證不重復。
而且當MaxOpenRequests設置為1的時候,吞吐量不高。
注意,當啟動冪等生產者的時候,Retry次數必須要大於0,ack必須為all。
在Java客戶端中,允許MaxOpenRequests小於等於5。
但是在Sarama
中有一個很奇怪的地方我也沒有研究明白,我們直接看一看這部分的代碼:
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
}
if c.Producer.Retry.Max == 0 {
return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
}
if c.Producer.RequiredAcks != WaitForAll {
return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
}
if c.Net.MaxOpenRequests > 1 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
}
}
這一部分第一項是版本號,沒問題,第二第三項是Retry
和Acks
,也沒有問題。問題在於第四項,這里的MaxOpenRequests
參數,我想應該等同於Java客戶端中的MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
,按照Java客戶端中的配置,應該是這個參數小於等於5,即可保證冪等,但是這里必須得設置為1。
檢查了Sarama的Issue,有開發者提出了這個問題,但是目前作者還沒有打算解決。
3 broker
在這一節的內容中,我將會從代碼的層面介紹 Sarama
生產者發送消息的全過程。但是因為代碼很多,我將會省略一些內容,包括一些錯誤處理、重試等。
這些都很重要,也不應該被省略。但是因為篇幅有限,我只能介紹最核心的發送消息這一部分的內容。
我會在貼代碼之前,大概的說一下這段代碼的思路。隨后,我會在代碼中加入一些注釋,來更詳細的進行解釋。
然后我們開始吧!
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
一切都從這么一行開始講起。
我們進去看看。
在這里其實就只有兩個部分,先是通過地址和配置,構建一個 client
。
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
// 構建client
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
// 構建AsyncProducer
return newAsyncProducer(client)
}
3.1 Client的創建
在創建 Client
的過程中,先構建一個 client
結構體。
里面的參數我們先不管,等用到了再進行解釋。
然后創建完之后,刷新元數據,並且啟動一個協程,在后台進行刷新。
func NewClient(addrs []string, conf *Config) (Client, error) {
...
// 構建一個client
client := &client{
conf: conf,
closer: make(chan none),
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
metadataTopics: make(map[string]none),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
// 把用戶輸入的broker地址作為“種子broker”增加到seedBrokers中
// 隨后客戶端會根據已有的broker地址,自動刷新元數據,以獲取更多的broker地址
// 所以稱之為種子
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}
...
// 啟動協程在后台刷新元數據
go withRecover(client.backgroundMetadataUpdater)
return client, nil
}
3.2 元數據的更新
后台更新元數據的設計其實很簡單,利用一個 ticker
,按時對元數據進行更新,直到 client
關閉。
這里先提一下我們說的元數據,有哪些內容。
你可以簡單的理解為包含了所有 broker
的地址(因為 broker
可能新增,也可能減少),以及包含了哪些 topic
,這些 topic
有哪些 partition
等。
func (client *client) backgroundMetadataUpdater() {
// 按照配置的時間更新元數據
ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
defer ticker.Stop()
// 循環獲取channel,判斷是執行更新操作還是終止
for {
select {
case <-ticker.C:
if err := client.refreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
return
}
}
}
然后我們繼續來看看 client.refreshMetadata()
這個方法,這個方法是判斷了一下需要刷新哪些主題的元數據,還是說全部主題的元數據。
然后我們繼續。
在這里也還沒有涉及到具體的更新操作。我們看 tryRefreshMetadata
這個方法的參數可以得知,在這里我們設置了需要刷新元數據的主題,重試的次數,超時的時間。
func (client *client) RefreshMetadata(topics ...string) error {
deadline := time.Time{}
if client.conf.Metadata.Timeout > 0 {
deadline = time.Now().Add(client.conf.Metadata.Timeout)
}
// 設置參數
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
然后終於來到了tryRefreshMetadata
這個方法。
在這個方法中,會選取已經存在的broker,構造獲取元數據的請求。
在收到回應后,如果不存在任何的錯誤,就將這些元數據用於更新客戶端。
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
...
broker := client.any()
for ; broker != nil && !pastDeadline(0); broker = client.any() {
...
req := &MetadataRequest{
Topics: topics,
// 是否允許創建不存在的主題
AllowAutoTopicCreation: allowAutoTopicCreation
}
response, err := broker.GetMetadata(req)
switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// 對元數據進行更新
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err)
}
return err
case ...
...
}
}
然后我們繼續往下看看當客戶端拿到了 response
之后,是如何更新的。
首先,先對本地保存 broker
進行更新。
然后,對 topic
進行更新,以及這個 topic
下面的那些 partition
。
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
...
// 假設返回了新的broker id,那么保存這些新的broker,這意味着增加了broker、或者下線的broker重新上線了
// 如果返回的id我們已經保存了,但是地址變化了,那么更新地址
// 如果本地保存的一些id沒有返回,說明這些broker下線了,那么刪除他們
client.updateBroker(data.Brokers)
// 然后對topic也進行元數據的更新
// 主要是更新topic以及topic對應的partition
for _, topic := range data.Topics {
...
// 更新每個topic以及對應的partition
client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
for _, partition := range topic.Partitions {
client.metadata[topic.Name][partition.ID] = partition
...
}
}
至此,我們元數據的更新就說完了。
下面我們來說一說在更新元數據之前,broker是如何建立連接的,以及請求是如何發送出去,又是如何被broker接收的。
3.3 與Broker建立連接
讓我們回到 tryRefreshMetadata
這個方法中。
這個方法里面有這么一行代碼:
broker := client.any()
我們進去看看。
在這個方法里, 如果 seedBrokers
存在,那么就打開它,否則的話打開其他的broker。
注意,這里提到的其他的broker,可能是在刷新元數據的時候,獲取到的。這就跟上面的內容聯系在一起了。
func (client *client) any() *Broker {
...
if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}
// 不保證一定是按順序的
for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}
return nil
}
然后再讓我們看看 Open
方法做了什么。
Open
方法異步的建立了一個tcp
連接,然后創建了一個緩沖大小為MaxOpenRequests
的channel
。
這個名為 responses
的 channel
,用於接收從 broker
發送回來的消息。
其實在 broker
中,用於發送消息跟接收消息的 channel
都設置成了這個大小。
MaxOpenRequests
這個參數你可以理解為是Java客戶端中的max.in.flight.requests.per.connection
。
然后,又啟動了一個協程,用於接收消息。
func (b *Broker) Open(conf *Config) error {
if conf == nil {
conf = NewConfig()
}
...
go withRecover(func() {
...
dialer := conf.getDialer()
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
...
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
...
go withRecover(b.responseReceiver)
})
3.4 從Broker接收響應
我們來看看 responseReceiver
是怎么工作的。
其實很容易理解,當 broker
收到一個 response
的時候,先解析消息的頭部,然后再解析消息的內容。並把這些內容寫進 response
的 packets
中。
func (b *Broker) responseReceiver() {
for response := range b.responses {
...
// 先根據Header的版本讀取對應長度的Header
var headerLength = getHeaderLength(response.headerVersion)
header := make([]byte, headerLength)
bytesReadHeader, err := b.readFull(header)
decodedHeader := responseHeader{}
err = versionedDecode(header, &decodedHeader, response.headerVersion)
...
// 解析具體的內容
buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
bytesReadBody, err := b.readFull(buf)
// 省略了一些錯誤處理,總之,如果發生了錯誤,就把錯誤信息寫進 response.errors 中
response.packets <- buf
}
}
其實接收響應這部分的代碼邏輯很容易理解,就是當 response
這個 channel
有了消息,就讀取,然后將讀取到的內容寫進 response
中。
那么你可能會有一個問題,什么時候才會往response
這個 channel
發送消息呢?
很容易可以猜到,當我們發送了消息給 broker
,就應該要通知 receiver
,准備接受消息了。
既然如此,我們繼續剛剛刷新元數據的部分,看看 sarama
是如何把消息發送出去的。
3.5 發送與接受消息
我們回到這一行代碼:
response, err := broker.GetMetadata(req)
我們直接進去,發現在這里構造了一個接受返回信息的結構體,然后調用了sendAndReceive
方法。
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
我們繼續往下。
在這里我們可以看到,先是調用了send
方法,然后返回了一個promise
。並且當有消息寫入這個promise
的時候,就得到了結果。
而且回想一下我們在receiver
中,是不是把獲取到的 response
寫進了 packets
,把錯誤結果寫進了 errors
呢,跟這里是一致的對吧?
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
responseHeaderVersion := int16(-1)
if res != nil {
responseHeaderVersion = res.headerVersion()
}
promise, err := b.send(req, res != nil, responseHeaderVersion)
if err != nil {
return err
}
if promise == nil {
return nil
}
// 這里的promise,是上面send方法返回的
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
}
帶着這個想法,我們看看 send
方法做了什么事。
這個地方很重要,也是我認為 Sarama
設計的特別巧妙的一個地方。
在send方法中,把需要發送的消息通過與broker的tcp連接,同步發送到broker中。
然后構建了一個responsePromise類型的channel,然后直接將這個結構體丟進這個channel中。然后回想一下,我們在responseReceiver這個方法中,不斷消費接收到的response。
此時在responseReceiver中,收到了send方法傳遞的responsePromise,他就會通過conn來讀取數據,然后將數據寫入這個responsePromise的packets中,或者將錯誤信息寫入errors中。
而此時,再看看send方法,他返回了這個responsePromise的指針。所以,sendAndReceive方法就在等待這個responsePromise內的packets或者errors的channel被寫入數據。當responseReceiver接收到了響應並且寫入數據的時候,packets或者errors就會被寫入消息。
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
...
// 將請求的內容封裝進 request ,然后發送到Broker中
// 注意一下這里的 b.write(buf)
// 里面做了 b.conn.Write(buf) 這件事情
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
bytes, err := b.write(buf)
...
// 如果我們的response為nil,也就是說當不需要response的時候,是不會放進inflight發送隊列的
if !promiseResponse {
// Record request latency without the response
b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
// 構建一個接收響應的 channel ,返回這個channel的指針
// 這個 channel 內部包含了兩個 channel,一個用來接收響應,一個用來接收錯誤
promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
b.responses <- promise
// 這里返回指針特別的關鍵,是把消息的發送跟消息的接收聯系在一起了
return &promise, nil
}
讓我們來用一張圖說明一下上面這個發送跟接收的過程:
這一段比較繞,但這也是Sarama
發送與接受消息的核心內容,希望我的解釋能夠讓你理解:)
4 AsyncProcuder
在上一節中,我們已經分析了client的構造全過程,並且在構造client刷新元數據的時候,也解釋了sarama是如何發送消息以及接受消息的。
在這一節中,我打算解釋一下AsyncProcuder是如何發送消息的。
因為有了上一節的鋪墊,這一節的內容應該會比較容易理解。
我們從newAsyncProducer(client)
這一行開始講起。
我們先說說input:make(chan *ProducerMessage)
,這個事關我們的消息發送。注意到這個channel
是沒有緩沖的。
也就是說當我們發送一條消息到input
中的時候,此時發送方會阻塞,這說明了之后的操作必須不能夠被阻塞,否則會影響消息的發送效率。
然后其他字段我們先不管,后面用到了我們再提。
func newAsyncProducer(client Client) (AsyncProducer, error) {
...
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
go withRecover(p.dispatcher)
go withRecover(p.retryHandler)
}
4.1 dispatcher
我們往下看看下面協程啟動的go withRecover(p.dispatcher)
。
在這個方法中,首先創建了一個以Topic為key的map,這個map的value是無緩沖的channel。
到這里我們很容易可以推測得出,當通過input發送一條消息的時候,消息會到dispatcher這里,被分配到各個Topic中。
注意,在這個時候,channel還是無緩沖的,所以我們可以推測下一步的操作,依舊是無阻塞的。
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
...
for msg := range p.input {
...
// 攔截器
for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
...
// 找到這個Topic對應的Handler
handler := handlers[msg.Topic]
if handler == nil {
// 如果此時還不存在這個Topic對應的Handler,那么創建一個
// 雖然說他叫Handler,但他其實是一個無緩沖的
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
}
// 然后把這條消息寫進這個Handler中
handler <- msg
}
}
然后讓我們來handler = p.newTopicProducer(msg.Topic)
這一行的代碼。
在這里創建了一個緩沖大小為ChannelBufferSize
的channel,用於存放發送到這個主題的消息。
然后創建了一個topicProducer,在這個時候你可以認為消息已經交付給各個topic的topicProducer了。
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
}
go withRecover(tp.dispatch)
return input
}
4.2 topicDispatch
然后我們來看看go withRecover(tp.dispatch)
這一行代碼。
同樣是啟動了一個協程,來處理消息。
也就是說,到了這一步,對於每一個Topic,都有一個協程來處理消息。
在這個dispatch()方法中,也同樣的接收到一條消息,就會去找這條消息所在的分區的channel,然后把消息寫進去。
func (tp *topicProducer) dispatch() {
for msg := range tp.input {
...
// 同樣是找到這條消息所在的分區對應的channel,然后把消息丟進去
handler := tp.handlers[msg.Partition]
if handler == nil {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
tp.handlers[msg.Partition] = handler
}
handler <- msg
}
}
4.3 PartitionDispatch
我們進tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
這里看看。
你可以發現partitionProducer跟topicProducer是很像的。
其實他們就是代表了一條消息的分發,從producer到topic到partition。
注意,這里面的channel緩沖大小,也是ChannelBufferSize。
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
}
go withRecover(pp.dispatch)
return input
}
4.4 partitionProducer
到了這一步,我們再來看看消息到了每個partition所在的channel,是如何處理的。
其實在這一步中,主要是做一些錯誤處理之類的,然后把消息丟進brokerProducer。
可以理解為這一步是業務邏輯層到網絡IO層的轉變,在這之前我們只關心消息去到了哪個分區,而在這之后,我們需要找到這個分區所在的broker的地址,並使用之前已經建立好的TCP連接,發送這條消息。
func (pp *partitionProducer) dispatch() {
// 找到這個主題和分區的leader所在的broker
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
// 如果此時找到了這個leader
if pp.leader != nil {
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1)
// 發送一條消息來表示同步
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
...// 各種異常情況
// 然后把消息丟進brokerProducer中
pp.brokerProducer.input <- msg
}
4.5 brokerProducer
到了這里,大概算是整個發送流程最后的一個步驟了。
我們來看看pp.parent.getBrokerProducer(pp.leader)
這行代碼里面的內容。
其實就是找到asyncProducer
中的brokerProducer
,如果不存在,則創建一個。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]++
return bp
}
那我們就來看看brokerProducer
是怎么創建出來的。
看這個方法中啟動的第二個協程,我們可以推測bridge
這個channel
收到消息后,會把收到的消息打包成一個request,然后調用Produce
方法。
並且,將返回的結果的指針地址,寫進response中。
然后構造好brokerProducerResponse,並且寫入responses中。
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
讓我們再來看看broker.Produce(request)
這一行代碼。
是不是很熟悉呢,我們在client部分講到的sendAndReceive
方法。
而且我們可以發現,如果我們設置了需要Acks,就會返回一個response;如果沒設置,那么消息發出去之后,就不管了。
此時在獲取了response,並且填入了response的內容后,返回這個response的內容。
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
var (
response *ProduceResponse
err error
)
if request.RequiredAcks == NoResponse {
err = b.sendAndReceive(request, nil)
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
}
if err != nil {
return nil, err
}
return response, nil
}
至此,Sarama
生產者相關的內容就介紹完畢了。
寫在最后
這一篇寫的實在是有些久了。
主要是作者這段時間實在是太忙了,還沒有完全平衡好目前的學習工作和生活,導致每天花在學習上的時間不多,效率也不高。
另外就是網上我也沒有查到有Sarama
相關的解析,都是一些API的調用。因為作者恰好開始學習Kafka,為了更好地了解生產者的每一個參數,我選擇去研究生產者客戶端。
但是,因為作者源碼閱讀能力實在是有限,在這個過程中很有可能會有一些錯誤的理解。所以當你發現了一些違和的地方,也請不吝指教,謝謝你!
再次感謝你能看到這里!
PS:如果有其他的問題,也可以在公眾號找到我,歡迎來找我玩~