錯誤如標題:
場景:k8s 容器中通過 go語言編寫的 sarama 創建一個 AsyncProducer
錯誤原因查找
1.通過放開sarama的日志(自己實現日志接口,重定義Logger)
1.1 sarama源碼
*/ package sarama import ( "io/ioutil" "log" ) // Logger is the instance of a StdLogger interface that Sarama writes connection // management events to. By default it is set to discard all log messages via ioutil.Discard, // but you can set it to redirect wherever you want. var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) // StdLogger is used to log error messages. type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
1.2 源碼中具體實現
type Feedback struct { out *log.Logger log *log.Logger } func (fb *Feedback) Println(v ...interface{}) { fb.output(fmt.Sprintln(v...)) } func (fb *Feedback) Printf(format string, v ...interface{}) { fb.output(fmt.Sprintf(format, v...)) } func (fb *Feedback) Print(v ...interface{}) { fb.output(fmt.Sprint(v...)) } func (fb *Feedback) output(s string) { if fb.out != nil { fb.out.Output(2, s) } if fb.log != nil { fb.log.Output(2, s) } }
1.3 自定義 打印日志類
type ourLog struct { } func (fb *ourLog) Println(v ...interface{}) { log.Debug(fmt.Sprintln(v...)) } func (fb *ourLog) Printf(format string, v ...interface{}) { log.Debug(fmt.Sprintf(format, v...)) } func (fb *ourLog) Print(v ...interface{}) { log.Debug(fmt.Sprint(v...)) }
1.4 重定義 sarama.Logger
#在main中加入 sarama.Logger = &ourLog{}
2.重啟k8s中docker服務后看程序執行日志
[11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [retrying-1] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 abandoning broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [closed] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 shut down [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) client/metadata fetching metadata for [bi-data-cti-prod-31] from broker sany-onprem-repm-node03:9092 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 starting up [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [open] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 selected broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [flushing-1] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [normal] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [retrying] on bi-data-cti-prod-31/4 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date. [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [retrying-2] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 abandoning broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [closed] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 shut down [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) client/metadata fetching metadata for [bi-data-cti-prod-31] from broker sany-onprem-repm-node03:9092 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 starting up [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [open] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 selected broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [flushing-2] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [normal] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [retrying] on bi-data-cti-prod-31/4 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date. [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [retrying-3] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 abandoning broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [closed] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 shut down [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) client/metadata fetching metadata for [bi-data-cti-prod-31] from broker sany-onprem-repm-node03:9092 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 starting up [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [open] on bi-data-cti-prod-31/4 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 selected broker 1003 [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [flushing-3] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/leader/bi-data-cti-prod-31/4 state change to [normal] [11:25:58 CST 2020/08/06] [DEBG] (main.(*ourLog).Printf:47) producer/broker/1003 state change to [retrying] on bi-data-cti-prod-31/4 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date. [11:25:58 CST 2020/08/06] [DEBG] (app/kafka.(*AsyncProducer).run:92) p-,&{addrs:[sany-onprem-repm-node02:9092 sany-onprem-repm-node03:9092 sany-onprem-repm-node01:9092] username: password: certFile: channelBufferSize:102400 producer:0xc4202ec8c0 done:0xc4202142a0} [11:25:58 CST 2020/08/06] [DEBG] (app/kafka.(*AsyncProducer).run:93) p.producer-, &{client:0xc420218300 conf:0xc420092300 ownClient:true errors:0xc4202c8300 input:0xc4202c8360 successes:0xc4202c83c0 retries:0xc4202c8420 inFlight:{noCopy:{} state1:[0 0 0 0 0 0 0 0 0 0 0 0] sema:0} brokers:map[0xc4200ea160:0xc4200a4240 0xc4200ea6e0:0xc4204990e0 0xc4200eb080:0xc4200a48a0] brokerRefs:map[0xc4200a4240:2 0xc4204990e0:1 0xc4200a48a0:1] brokerLock:{state:0 sema:0}} [11:25:58 CST 2020/08/06] [DEBG] (app/kafka.(*AsyncProducer).run:94) p.producer.Input-,0xc4202c8360 [11:25:58 CST 2020/08/06] [DEBG] (app/kafka.(*AsyncProducer).run:95) p.producer.Successes-, 0xc4202c8360 [11:25:58 CST 2020/08/06] [EROR] (app/kafka.(*AsyncProducer).run:96) kafka: Failed to produce message to topic bi-data-cti-prod-31: kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date., &{Topic:bi-data-cti-prod-31 Key:119 Value:{"MainType":2,"ExtType":9,"Mode":2,"ModeParm":"ag-19","MSGID":"753","TELID":"def","MSG":{"vcc_id":"1","ag_id":"19","que_id":["1"],"grp_id":"0","ag_sta":"3","ag_sta_reason":"1","ag_sta_id":"7","ag_sta_bef":"2","ag_sta_time":"1596684344"}} Metadata:<nil> Offset:0 Partition:4 Timestamp:0001-01-01 00:00:00 +0000 UTC retries:0 flags:0}
3.日志分析
發現 client 每隔10分鍾會定期從 kafka broker 拉取最新的 metadata,在我們新建Producer時,默認retries是3,當3次均拉取不到metadata時,那我們當前消息就寫不到kafa,並拋出以上異常
kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date, 嘗試在kafka構建的服務器上發布此程序,沒有發現取不到metadata
的情況,在k8s 容器上執行此程序,不定期就會存在此問題
4.處理方式
目前還沒有找到解決方式,可以嘗試增加retries的值