一、kafka參數解析
一個消費者可以消費同一個topic的多個分區,但是一個分區不能被同一個組下的多個消費者消費。同一個組下有多個消費者並發消費同一個topic時,要注意設置的消費者並發個數一定要小於等於topic的分區數,不然會有空置的線程沒有分區可以消費,並發的時候根據分區數和消費者的個數來分配每個消費者消費幾個分區,消費者可以消費一個或多個分區。
#消費端自動提交
kafka.enable.auto.commit=true
#消費端與kafaka服務端的超時時間,消費端超過這個時間沒有拉取數據的話,會被服務端剔除,服務端會把partions重新在消費中分配。
kafka.session.timeout=6000
#提交間隙
kafka.auto.commit.interval=100
#auto.offset.reset有常用的兩種取值
#earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
#latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
當新加了分區時,此時如果消費端沒有起來,當產生一定量的數據后,起了消費,這時如果用的latest,latest會從拉起的那一個開始消費后面的數據,前面的就丟失了,如果用的earliest會從頭消費。
如果是新加了topic,但消費沒起來,等產生了一定數據后,latest會從消費拉起的那刻起消費新產生的數據,earliest會重頭消費。
如果每個topic都被消費過並且分區都提交了offert,則不管用latest還是earliest都會從上次提交的offert開始消費,所以會消費積壓的數據。
kafka.auto.offset.reset=latest
#消費者並發個數,設置為幾同一個消費組下就有幾個消費者並發消費同一個topic,一個消費者就是一個線程。並發數要小於topic的分區數,避免有空線程無分區可消費。
每個消費者會拉起三個線程,ThreadPooltaskSchedule 控制消費者的定時任務,
kafka-coordinator-heartbeat-thread 消費者和服務端的心跳線程,
org.springframework.kafka.KafkaListenerEndpointContainer 消費者poll數據的線程。
kafka.concurrency=2
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
二、設置kafka的數據過期時間
修改kafka服務端的server.properties配置
log.retention.hours=72
log.cleanup.policy=delete
組與組間的消費者是沒有關系的。
topic中已有分組消費數據,新建其他分組ID的消費者時,之前分組提交的offset對新建的分組消費不起作用。
四、Flink Kafka consumer的消費策略配置(可以指定消費起始位置)
FlinkKafkaConsumer011[String] consumer = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps)
// 指定消費策略
consumer.setStartFromEarliest() // - 從最早的記錄開始;
consumer.setStartFromLatest() //- 從最新記錄開始;
consumer.setStartFromTimestamp(null); // 從指定的epoch時間戳(毫秒)開始;
consumer.setStartFromGroupOffsets(); // 默認行為,從上次消費的偏移量進行繼續消費。