我們經常出現這個問題,除了上次修復完后,正常跑也總有這種問題。因為我們的場景是文件掃描,文件掃描的處理方式是很重的,消費時間長。所以經常觸發重平衡。
那么又回到了原始問題。第一性原理,運維和架構,就是要做工程上的最佳實踐,而不是什么技術新舊。最佳實踐就是最佳適配。
kafka的consumer重平衡機制,注定了它不適合做長邏輯耗時業務的處理。(它的背景本身是無邏輯處理,只是傳輸日志)
-----------------------------------------------------------------
20220419復現:
參數
config.Consumer.Group.Session.Timeout = time.Second * 120
config.Consumer.Group.Heartbeat.Interval = time.Second * 20
調整成
config.Consumer.Group.Session.Timeout = time.Second * 30 config.Consumer.Group.Heartbeat.Interval = time.Second * 5
就復現:
{"Level":"panic","Time":"2022-04-19T20:01:14.874530508+08:00","LoggerName":"","Message":
"Error from consumer: read tcp :33178-\u003e:9093: i/o timeout",
"Caller":{"Defined":true,"PC":8910019,"File":"/data/share//golang/cloudscan/pubsub/groupconsumer.go","Line":147,
"Function":"cloudscan/pubsub.(*GroupConsumer).StartConsume.func1"},"Stack":"cloudscan/pubsub.(*GroupConsumer).StartConsume.func1\n\t
/data/share//golang/cloudscan/pubsub/groupconsumer.go:147"
----------------------------------------------------------------
為了應對這種情況,我們調大了Sarama 的參數config.Consumer.Group.Session.Timeout = time.Second * 120,也就是心跳超時時間,但我們的網絡超時時間很小,默認30秒,30秒我們的場景,文件掃描消費時間長,30秒是可能處理不完數據的。最終配置:
config.Consumer.Group.Session.Timeout = time.Second * 120 // c.Net.ReadTimeout (default 30 * time.Second) to > config.Consumer.Group.Session.Timeout(120) // https://github.com/Shopify/sarama/issues/1422 config.Net.ReadTimeout = config.Consumer.Group.Session.Timeout + 30*time.Second config.Consumer.Group.Heartbeat.Interval = time.Second * 20 config.Consumer.MaxProcessingTime = time.Minute * 10
日志報:
Jan 29 13:41:29 csmeta[21871]: panic: Error from dir consumer: read tcp :50626->:9092: i/o timeout
Jan 29 13:40:05 csscand[27300]: panic: Error from file consumer: read tcp :49560->:9092: i/o timeout
看起來是io錯誤,第一定位到磁盤?server端? 掉坑里
在仔細看,tcp網絡錯誤,地位網絡?又掉坑里
實際可能是golang context 相關錯誤,和sarama consumer算法相關。錯誤在客戶端。
源碼地址:https://github.com/Shopify/sarama/blob/v1.19.0/consumer_group.go#L18
而且這個context相關的報tcp i/o time out錯誤。其他工程也有:
https://github.com/jackc/pgx/issues/831
I am on the same situation described by @adw1n and @atombender. I can't gracefully handle a good amount of requests with context cancelled because pgx returns an "i/o timeout" which is not really a network timeout. The helper function mentioned by @jackc doesn't help in this situation
https://github.com/jackc/pgconn/issues/80
Context expiration is implemented by SetDeadline
on the underlying net.Conn
. This means that the error returned from a query can be the underlying net error instead of the context error. This is confusing for callers.
See jackc/pgx#831.
sarama github相關討論:
https://github.com/Shopify/sarama/issues/1192
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: v1.19.0
Kafka Version: v2.0
Go Version: v1.11
Problem Description
Getting i/o timeout error from broker when using consumer group.
Error: Received unexpected error read tcp <ip>:63133-><ip>:9092: i/o timeout
Error is returned from https://github.com/Shopify/sarama/blob/master/broker.go#L592
Note: same kafka cluster is working fine when not using consumer group
I experienced perhaps the same problem. One consumer instance using the ConsumerGroup API is fine, but when I tried to start another instance, the other instance got the same error and could not join the group.
Sarama v1.19.0
Kafka 1.1.0
Go v1.10.3
Solved the problem by watching ConsumerGroupSession.Context().Done()
as well in ConsumeClaim
.
The comment at https://github.com/Shopify/sarama/blob/v1.19.0/consumer_group.go#L18 is clear enough, but it seems that the example at https://godoc.org/github.com/Shopify/sarama#example-ConsumerGroup does not take rebalancing into consideration.
func (h MyHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { var msg SyncMessage for { select { case cMsg := <-claim.Messages(): err := json.Unmarshal(cMsg.Value, &msg) if err != nil { return err } // do something sess.MarkMessage(cMsg, "") case <-sess.Context().Done(): return nil } } }
type ConsumerGroup interface { | |
// Consume joins a cluster of consumers for a given list of topics and | |
// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler. | |
// | |
// The life-cycle of a session is represented by the following steps: | |
// | |
// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) | |
// and is assigned their "fair share" of partitions, aka 'claims'. | |
// 2. Before processing starts, the handler's Setup() hook is called to notify the user | |
// of the claims and allow any necessary preparation or alteration of state. | |
// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called | |
// in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected | |
// from concurrent reads/writes. | |
// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the | |
// parent context is cancelled or when a server-side rebalance cycle is initiated. | |
// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called | |
// to allow the user to perform any final tasks before a rebalance. | |
// 6. Finally, marked offsets are committed one last time before claims are released. | |
// | |
// Please note, that once a relance is triggered, sessions must be completed within | |
// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit | |
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout | |
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset | |
// commit failures. | |
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error | |
// Errors returns a read channel of errors that occurred during the consumer life-cycle. | |
// By default, errors are logged and not returned over this channel. | |
// If you want to implement any custom error handling, set your config's | |
// Consumer.Return.Errors setting to true, and read from this channel. | |
Errors() <-chan error | |
// Close stops the ConsumerGroup and detaches any running sessions. It is required to call | |
// this function before the object passes out of scope, as it will otherwise leak memory. | |
Close() error | |
} |
--------------------------------------------------------------------------------------------------------------------------------------------------------