基本的操作和管理
-- 創建 bin/kafka-topics.sh --create --zookeeper 192.168.179.11:2181 --replication-factor 1 --partitions 1 --topic hello-topic-12 -- 查看 bin/kafka-topics.sh --list --zookeeper 192.168.179.11:2181 bin/kafka-topics.sh --describe --zookeeper 192.168.179.11:2181 --topic hello-topic-12 -- 查看topic列表 bin/kafka-console-producer.sh --broker-list 192.168.179.11:9092 --topic hello-topic-12 -- 消費者 bin/kafka-console-consumer.sh --bootstrap-server 192.168.179.11:9092 --topic hello-topic-12 --from-beginning -- 刪除 bin/kafka-topics.sh --delete --zookeeper 192.168.179.11:2181 --topic hello-topic-12
1.Broker配置
配置文件放在Kafka目錄下的config目錄中,主要是server.properties文件
1.1常規配置
broker.id
在單機時無需修改,但在集群下部署時往往需要修改。它是個每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況
listeners
監聽列表(以逗號分隔 不同的協議(如plaintext,trace,ssl、不同的IP和端口)),hostname如果設置為0.0.0.0則綁定所有的網卡地址;如果hostname為空則綁定默認的網卡。如果
沒有配置則默認為java.net.InetAddress.getCanonicalHostName()。
如:PLAINTEXT://myhost:9092,TRACE://:9091或 PLAINTEXT://0.0.0.0:9092,
zookeeper.connect
zookeeper集群的地址,可以是多個,多個之間用逗號分割
log.dirs
Kafka把所有的消息都保存在磁盤上,存放這些數據的目錄通過log.dirs指定。
num.recovery.threads.per.data.dir
每數據目錄用於日志恢復啟動和關閉時的線程數量。因為這些線程只是服務器啟動和關閉時會用到。所以完全可以設置大量的線程來達到並行操作的目的。注意,這個參數指的是每個日志目錄的線程數,比如本參數設置為8,而log.dirs設置為了三個路徑,則總共會啟動24個線程。
auto.create.topics.enable
是否允許自動創建主題。如果設為true,那么produce,consume或者fetch metadata一個不存在的主題時,就會自動創建。缺省為true。
1.2主題配置
新建主題的默認參數
num.partitions
每個新建主題的分區個數。這個參數一般要評估,比如,每秒鍾要寫入和讀取1GB數據,如果現在每個消費者每秒鍾可以處理50MB的數據,那么需要20個分區,這樣就可以讓20個消費者同時讀取這些分區,從而達到設計目標。
log.retention.hours
日志保存時間,默認為7天(168小時)。超過這個時間會清理數據。bytes和minutes無論哪個先達到都會觸發。與此類似還有log.retention.minutes和log.retention.ms,都設置的話,優先使用具有最小值的那個。
log.retention.bytes
topic每個分區的最大文件大小,一個topic的大小限制 = 分區數*log.retention.bytes。-1沒有大小限制。log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除。
log.segment.bytes
分區的日志存放在某個目錄下諸多文件中,這些文件將分區的日志切分成一段一段的,我們稱為日志片段。這個屬性就是每個文件的最大尺寸;當尺寸達到這個數值時,就會關閉當前文件,並創建新文件。被關閉的文件就開始等待過期。默認為1G。
如果一個主題每天只接受100MB的消息,那么根據默認設置,需要10天才能填滿一個文件。而且因為日志片段在關閉之前,消息是不會過期的,所以如果log.retention.hours保持默認值的話,那么這個日志片段需要17天才過期。因為關閉日志片段需要10天,等待過期又需要7天。
log.segment.ms
作用和log.segment.bytes類似,只不過判斷依據是時間。同樣的,兩個參數,以先到的為准。這個參數默認是不開啟的。
message.max.bytes
表示一個服務器能夠接收處理的消息的最大字節數,注意這個值producer和consumer必須設置一致,且不要大於fetch.message.max.bytes屬性的值。該值默認是1000000字節,大概900KB~1MB。
2.生產者發送消息
2.1必選屬性
創建生產者對象時有三個屬性必須指定。
bootstrap.servers
該屬性指定broker的地址清單,地址的格式為host:port。清單里不需要包含所有的broker地址,生產者會從給定的broker里查詢其他broker的信息。不過最少提供2個broker的信息,一旦其中一個宕機,生產者仍能連接到集群上。
key.serializer
生產者接口允許使用參數化類型,可以把Java對象作為鍵和值傳broker,但是broker希望收到的消息的鍵和值都是字節數組,所以,必須提供將對象序列化成字節數組的序列化器。key.serializer必須設置為實現org.apache.kafka.common.serialization.Serializer的接口類,Kafka的客戶端默認提供了ByteArraySerializer,IntegerSerializer, StringSerializer,也可以實現自定義的序列化器。
value.serializer
2.2更多發送配置
生產者有很多屬性可以設置,大部分都有合理的默認值,無需調整。有些參數可能對內存使用,性能和可靠性方面有較大影響。可以參考org.apache.kafka.clients.producer包下的ProducerConfig類。
acks:
指定了必須要有多少個分區副本收到消息,生產者才會認為寫入消息是成功的,這個參數對消息丟失的可能性有重大影響。
acks=0:生產者在寫入消息之前不會等待任何來自服務器的響應,容易丟消息,但是吞吐量高。
acks=1:只要集群的首領節點收到消息,生產者會收到來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新首領沒有選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。默認使用這個配置。
acks=all:只有當所有參與復制的節點都收到消息,生產者才會收到一個來自服務器的成功響應。延遲高。
buffer.memory
設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果數據產生速度大於向broker發送的速度,導致生產者空間不足,producer會阻塞或者拋出異常。缺省33554432 (32M)
max.block.ms
指定了在調用send()方法或者使用partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到max.block.ms時,生產者會拋出超時異常。缺省60000ms
retries
發送失敗時,指定生產者可以重發消息的次數。默認情況下,生產者在每次重試之間等待100ms,可以通過參數retry.backoff.ms參數來改變這個時間間隔。缺省0
receive.buffer.bytes和send.buffer.bytes
指定TCP socket接受和發送數據包的緩存區大小。如果它們被設置為-1,則使用操作系統的默認值。如果生產者或消費者處在不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。缺省102400
batch.size
當多個消息被發送同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次內存被填滿后,批次里的所有消息會被發送出去。但是生產者不一定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也有可能被發送。缺省16384(16k)
linger.ms
指定了生產者在發送批次前等待更多消息加入批次的時間。它和batch.size以先到者為先。也就是說,一旦我們獲得消息的數量夠batch.size的數量了,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比batch.size設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲,但也會提升消息的吞吐量。
compression.type
producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好。snappy占用cpu少,提供較好的性能和可觀的壓縮比,如果比較關注性能和網絡帶寬,用這個。如果帶寬緊張,用gzip,會占用較多的cpu,但提供更高的壓縮比。
client.id
當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤。
max.in.flight.requests.per.connection
指定了生產者在接收到服務器響應之前可以發送多個消息,值越高,占用的內存越大,當然也可以提升吞吐量。發生錯誤時,可能會造成數據的發送順序改變,默認是5 (修改)。
如果需要保證消息在一個分區上的嚴格順序,這個值應該設為1。不過這樣會嚴重影響生產者的吞吐量。
request.timeout.ms
客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求;超過重試次數將拋異常
metadata.fetch.timeout.ms
是指我們所獲取的一些元數據的第一個時間數據。元數據包含:topic,host,partitions。此項配置是指當等待元數據fetch成功完成所需要的時間,否則會跑出異常給客戶端
timeout.ms
此配置選項控制broker等待副本確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網絡延遲。這個參數和acks的配置相匹配。
max.request.size
控制生產者發送請求最大大小。假設這個值為1M,如果一個請求里只有一個消息,那這個消息不能大於1M,如果一次請求是一個批次,該批次包含了1000條消息,那么每個消息不能大於1KB。注意:broker具有自己對消息記錄尺寸的覆蓋,如果這個尺寸小於生產者的這個設置,會導致消息被拒絕。
2.3順序保證
Kafka 可以保證同一個分區里的消息是有序的。也就是說,如果生產者一定的順序發送消息, broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下, 順序是非常重要的。例如,往一個賬戶存入100 元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。
如果把retires設為非零整數,同時把max.in.flight.request.per.connection設為比1 大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關鍵的,所以不建議把retires設為0 。可以把max.in.flight.request.per.connection 設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給broker 。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做。
3.消費者接受消息
3.1必選參數
bootstrap.servers、key.serializer、value.serializer含義同生產者
group.id
並非完全必需,它指定了消費者屬於哪一個群組,但是創建不屬於任何一個群組的消費者並沒有問題。
3.2消費者配置
消費者有很多屬性可以設置,大部分都有合理的默認值,無需調整。有些參數可能對內存使用,性能和可靠性方面有較大影響。可以參考org.apache.kafka.clients.consumer包下ConsumerConfig類。
fetch.min.bytes
每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。缺省為1個字節。多消費者下,可以設大這個值,以降低broker的工作負載
fetch.wait.max.ms
如果沒有足夠的數據能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。缺省為500個毫秒。和上面的fetch.min.bytes結合起來,要么滿足數據的大小,要么滿足時間,就看哪個條件先滿足。
max.partition.fetch.bytes
指定了服務器從每個分區里返回給消費者的最大字節數,默認1MB。假設一個主題有20個分區和5個消費者,那么每個消費者至少要有4MB的可用內存來接收記錄,而且一旦有消費者崩潰,這個內存還需更大。注意,這個參數要比服務器的message.max.bytes更大,否則消費者可能無法讀取消息。
session.timeout.ms
如果consumer在這段時間內沒有發送心跳信息,則它會被認為掛掉了。默認3秒。
auto.offset.reset
消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下,如何處理。默認值是latest,從最新的記錄開始讀取,另一個值是earliest,表示消費者從起始位置讀取分區的記錄。
注意:默認值是latest,意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄),可以先啟動生產者,再啟動消費者,觀察到這種情況。觀察代碼,在模塊kafka-no-spring下包hellokafka中。
enable .auto.commit
默認值true,表明消費者是否自動提交偏移。為了盡量避免重復數據和數據丟失,可以改為false,自行控制何時提交。
partition.assignment.strategy
分區分配給消費者的策略。系統提供兩種策略。默認為Range。允許自定義策略。
Range
把主題的連續分區分配給消費者。例如,有主題T1和T2,各有3個分區,消費者C1和C2,則可能的分配形式為:
C1: T1(0,1),T2(0,,1)
C2: T1(2),T2(2)
RoundRobin
把主題的分區循環分配給消費者。例如,有主題T1和T2,各有3個分區,消費者C1和C2,則可能的分配形式為:
C1: T1(0,2),T2(1)
C2: T1(1),T2(0,2)
自定義策略
extends 類AbstractPartitionAssignor,然后在消費者端增加參數:
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 類.class.getName());即可。
client.id
當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤。
max.poll.records
控制每次poll方法返回的的記錄數量。
receive.buffer.bytes和send.buffer.bytes
指定TCP socket接受和發送數據包的緩存區大小。如果它們被設置為-1,則使用操作系統的默認值。如果生產者或消費者處在不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
自動提交
最簡單的提交方式是讓消費者自動提交偏移量。 如果 enable.auto.comnit被設為 true,消費者會自動把從poll()方法接收到的最大偏移量提交上去。提交時間間隔由auto.commit.interval.ms控制,默認值是5s。自動提交是在輪詢里進行的,消費者每次在進行輪詢時會檢査是否該提交偏移量了,如果是,那么就會提交從上一次輪詢返回的偏移量。
不過,在使用這種簡便的方式之前,需要知道它將會帶來怎樣的結果。
假設我們仍然使用默認的5s提交時間間隔, 在最近一次提交之后的3s發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后了3s,所以在這3s內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量, 減小可能出現重復消息的時間窗, 不過這種情況是無法完全避免的 。
在使用自動提交時,每次調用輪詢方法都會把上一次調用返回的偏移量提交上去,它並不知道具體哪些消息已經被處理了,所以在再次調用之前最好確保所有當前調用返回的消息都已經處理完畢(enable.auto.comnit被設為 true時,在調用 close()方法之前也會進行自動提交)。一般情況下不會有什么問題,不過在處理異常或提前退出輪詢時要格外小心。
自動提交雖然方便,不過並沒有為我們留有余地來避免重復處理消息。
手動提交
我們通過控制偏移量提交時間來消除丟失消息的可能性,並在發生再均衡時減少重復消息的數量。消費者API提供了另一種提交偏移量的方式,開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。
把auto.commit. offset設為 false,自行決定何時提交偏移量。使用 commitsync()提交偏移量最簡單也最可靠。這個方法會提交由poll()方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
注意: commitsync()將會提交由poll()返回的最新偏移量,所以在處理完所有記錄后要確保調用了 commitsync(),否則還是會有丟失消息的風險。如果發生了再均衡,從最近批消息到發生再均衡之間的所有消息都將被重復處理。
異步提交
手動提交時,在broker對提交請求作出回應之前,應用程序會一直阻塞。這時我們可以使用異步提交API,我們只管發送提交請求,無需等待broker的響應。
具體使用,參見模塊kafka-no-spring下包commit包中代碼。
在成功提交或碰到無法恢復的錯誤之前, commitsync()會一直重試,但是 commitAsync不會。它之所以不進行重試,是因為在它收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。
假設我們發出一個請求用於提交偏移量2000,,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,並成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000,它有可能在偏移量3000之后提交成功。這個時候如果發生再均衡,就會出現重復消息。
commitAsync()也支持回調,在 broker作出響應時會執行回調。回調經常被用於記錄提交錯誤或生成度量指標。
同步和異步組合
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。
因此,在消費者關閉前一般會組合使用 commitAsync()和 commitsync()。具體使用,
特定提交
在我們前面的提交中,提交偏移量的頻率與處理消息批次的頻率是一樣的。但如果想要更頻繁地提交該怎么辦?
如果poll()方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,想要在批次中間提交偏移量該怎么辦?這種情況無法通過調用 commitSync()或 commitAsync()來實現,因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。
消費者API允許在調用 commitsync()和 commitAsync()方法時傳進去希望提交的分區和偏移量的map。假設我們處理了半個批次的消息,最后一個來自主題“customers”,分區3的消息的偏移量是5000,你可以調用 commitsync()方法來提交它。不過,因為消費者可能不只讀取一個分區,因為我們需要跟蹤所有分區的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復雜。
public class CommitSpecial { public static void main(String[] args) { /*消息消費者*/ Properties properties = KafkaConst.consumerConfig( "CommitSpecial", StringDeserializer.class, StringDeserializer.class); /*取消自動提交*/ properties.put("enable.auto.commit",false); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try { consumer.subscribe(Collections.singletonList( BusiConst.CONSUMER_COMMIT_TOPIC)); while(true){ ConsumerRecords<String, String> records = consumer.poll(500); for(ConsumerRecord<String, String> record:records){ System.out.println(String.format( "主題:%s,分區:%d,偏移量:%d,key:%s,value:%s", record.topic(),record.partition(),record.offset(), record.key(),record.value())); //要提交的數據 currOffsets.put(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset()+1,"no meta")); if(count%11==0){ consumer.commitAsync(currOffsets,null); } count++; } } } finally { consumer.close(); } } }
再均衡監聽器
在提交偏移量一節中提到過,消費者在退出和進行分區再均衡之前,會做一些清理工作比如,提交偏移量、關閉文件句柄、數據庫連接等。
在為消費者分配新分區或移除舊分區時,可以通過消費者API執行一些應用程序代碼,在調用 subscribe()方法時傳進去一個 ConsumerRebalancelistener實例就可以了。
ConsumerRebalancelistener有兩個需要實現的方法。
1) public void onPartitionsRevoked( Collection< TopicPartition> partitions)方法會在
再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了
2) public void onPartitionsAssigned( Collection< TopicPartition> partitions)方法會在重新分配分區之后和消費者開始讀取消息之前被調用。
public class HandlerRebalance implements ConsumerRebalanceListener { /*模擬一個保存分區偏移量的數據庫表*/ public final static ConcurrentHashMap<TopicPartition,Long> partitionOffsetMap = new ConcurrentHashMap<TopicPartition,Long>(); private final Map<TopicPartition, OffsetAndMetadata> currOffsets; private final KafkaConsumer<String,String> consumer; //private final Transaction tr事務類的實例 public HandlerRebalance(Map<TopicPartition, OffsetAndMetadata> currOffsets, KafkaConsumer<String, String> consumer) { this.currOffsets = currOffsets; this.consumer = consumer; } //分區再均衡之前 public void onPartitionsRevoked( Collection<TopicPartition> partitions) { final String id = Thread.currentThread().getId()+""; System.out.println(id+"-onPartitionsRevoked參數值為:"+partitions); System.out.println(id+"-服務器准備分區再均衡,提交偏移量。當前偏移量為:" +currOffsets); //我們可以不使用consumer.commitSync(currOffsets); //提交偏移量到kafka,由我們自己維護*/ //開始事務 //偏移量寫入數據庫
/**模擬提交到數據庫維護*/ System.out.println("分區偏移量表中:"+partitionOffsetMap); for(TopicPartition topicPartition:partitions){ partitionOffsetMap.put(topicPartition, currOffsets.get(topicPartition).offset()); } consumer.commitSync(currOffsets); //提交業務數和偏移量入庫 tr.commit } //分區再均衡完成以后 public void onPartitionsAssigned( Collection<TopicPartition> partitions) { final String id = Thread.currentThread().getId()+""; System.out.println(id+"-再均衡完成,onPartitionsAssigned參數值為:"+partitions); System.out.println("分區偏移量表中:"+partitionOffsetMap); for(TopicPartition topicPartition:partitions){ System.out.println(id+"-topicPartition"+topicPartition); //模擬從數據庫中取得上次的偏移量 Long offset = partitionOffsetMap.get(topicPartition); if(offset==null) continue;
//設置分區偏移量 consumer.seek(topicPartition,partitionOffsetMap.get(topicPartition)); } } }
獨立消費者
到目前為止,我們討論了消費者群組,分區被自動分配給群組里的消費者,在群組里新增或移除消費者時自動觸發再均衡。不過有時候可能只需要一個消費者從一個主題的所有分區或者某個特定的分區讀取數據。這個時候就不需要消費者群組和再均衡了,只需要把主題或者分區分配給消費者,然后開始讀取消息並提交偏移量。
如果是這樣的話,就不需要訂閱主題,取而代之的是為自己分配分區。一個消費者可以訂閱主題(並加入消費者群組),或者為自己分配分區,但不能同時做這兩件事情。
public class IndependConsumer { private static KafkaConsumer<String,String> consumer = null; public static final String SINGLE_CONSUMER_TOPIC = "single-consumer"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConst.LOCAL_BROKER); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); /*獨立消息消費者*/ consumer= new KafkaConsumer<String, String>(properties); List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>(); List<PartitionInfo> partitionInfos = consumer.partitionsFor(SINGLE_CONSUMER_TOPIC); if(null!=partitionInfos){ for(PartitionInfo partitionInfo:partitionInfos){ topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } } consumer.assign(topicPartitionList); try { while(true){ ConsumerRecords<String, String> records = consumer.poll(1000); for(ConsumerRecord<String, String> record:records){ System.out.println(String.format( "主題:%s,分區:%d,偏移量:%d,key:%s,value:%s", record.topic(),record.partition(),record.offset(), record.key(),record.value())); //do our work } } } finally { consumer.close(); } } }
從特定偏移量處開始記錄
到目前為止,我們知道了如何使用poll()方法從各個分區的最新偏移量處開始處理消息。
不過,有時候我們也需要從特定的偏移量處開始讀取消息。
如果想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可以使 seekToBeginning(Collection<TopicPartition> tp)和 seekToEnd( Collection<TopicPartition>tp)這兩個方法。
不過,Kaka也為我們提供了用於查找特定偏移量的API。它有很多用途,比如向后回退幾個消息或者向前跳過幾個消息(對時間比較敏感的應用程序在處理滯后的情況下希望能夠向前跳過若干個消息)。在使用 Kafka以外的系統來存儲偏移量時,它將給我們帶來更大的驚喜--讓消息的業務處理和偏移量的提交變得一致。
試想一下這樣的場景:應用程序從Kaka讀取事件(可能是網站的用戶點擊事件流),對它們進行處理(可能是使用自動程序清理點擊操作並添加會話信息),然后把結果保存到數據庫。假設我們真的不想丟失任何數據,也不想在數據庫里多次保存相同的結果。
我們可能會,毎處理一條記錄就提交一次偏移量。盡管如此,在記錄被保存到數據庫之后以及偏移量被提交之前,應用程序仍然有可能發生崩潰,導致重復處理數據,數據庫里就會出現重復記錄。
如果保存記錄和偏移量可以在一個原子操作里完成,就可以避免出現上述情況。記錄和偏移量要么都被成功提交,要么都不提交。如果記錄是保存在數據庫里而偏移量是提交到Kafka上,那么就無法實現原子操作不過,如果在同一個事務里把記錄和偏移量都寫到數據庫里會怎樣呢?那么我們就會知道記錄和偏移量要么都成功提交,要么都沒有,然后重新處理記錄。
現在的問題是:如果偏移量是保存在數據庫里而不是 Kafka里,那么消費者在得到新分區時怎么知道該從哪里開始讀取?這個時候可以使用seek()方法。在消費者啟動或分配到新分區時,可以使用seck()方法查找保存在數據庫里的偏移量。我們可以使用使用 Consumer Rebalancelistener和seek()方法確保我們是從數據庫里保存的偏移量所指定的位置開始處理消息的。
優雅退出
如果確定要退出循環,需要通過另一個線程調用 consumer. wakeup()方法。如果循環運行在主線程里,可以在 ShutdownHook里調用該方法。要記住, consumer. wakeup()是消費者唯一一個可以從其他線程里安全調用的方法。調用 consumer. wakeup()可以退出poll(),並拋出 WakeupException異常。我們不需要處理 Wakeup Exception,因為它只是用於跳出循環的一種方式。不過,在退出線程之前調用 consumer.close()是很有必要的,它會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡,而不需要等待會話超時。
消息的重復
原因
第一類原因
消息發送端應用的消息重復發送,有以下幾種情況。
l 消息發送端發送消息給消息中間件,消息中間件收到消息並成功存儲,而這時消息中間件出現了問題,導致應用端沒有收到消息發送成功的返回因而進行重試產生了重復。
l 消息中間件因為負載高響應變慢,成功把消息存儲到消息存儲中后,返回“成功”這個結果時超時。
l 消息中間件將消息成功寫入消息存儲,在返回結果時網絡出現問題,導致應用發送端重試,而重試時網絡恢復,由此導致重復。
可以看到,通過消息發送端產生消息重復的主要原因是消息成功進入消息存儲后,因為各種原因使得消息發送端沒有收到“成功”的返回結果,並且又有重試機制,因而導致重復。
第二類原因
消息到達了消息存儲,由消息中間件進行向外的投遞時產生重復,有以下幾種情況。
l 消息被投遞到消息接收者應用進行處理,處理完畢后應用出問題了,消息中間件不知道消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢后網絡出現問題了,消息中間件沒有收到消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理時間比較長,消息中間件因為消息超時會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢后消息中間件出問題了,沒能收到消息結果並處理,會再次投遞
l 消息被投遞到消息接收者應用進行處理,處理完畢后消息中間件收到結果但是遇到消息存儲故障,沒能更新投遞狀態,會再次投遞。
可以看到,在投遞過程中產生的消息重復接收主要是因為消息接收者成功處理完消息后,消息中間件不能及時更新投遞狀態造成的。
如何解決重復消費
那么有什么辦法可以解決呢?主要是要求消息接收者來處理這種重復的情況,也就是要求消息接收者的消息處理是冪等操作。
什么是冪等性?
對於消息接收端的情況,冪等的含義是采用同樣的輸入多次調用處理函數,得到同樣的結果。例如,一個SQL操作
update stat_table set count= 10 where id =1
這個操作多次執行,id等於1的記錄中的 count字段的值都為10,這個操作就是冪等的,我們不用擔心這個操作被重復。
再來看另外一個SQL操作
update stat_table set count= count +1 where id= 1;
這樣的SQL操作就不是冪等的,一旦重復,結果就會產生變化。
常見辦法
因此應對消息重復的辦法是,使消息接收端的處理是一個冪等操作。這樣的做法降低了消息中間件的整體復雜性,不過也給使用消息中間件的消息接收端應用帶來了一定的限制和門檻。
1. MVCC:
多版本並發控制,樂觀鎖的一種實現,在生產者發送消息時進行數據更新時需要帶上數據的版本號,消費者去更新時需要去比較持有數據的版本號,版本號不一致的操作無法成功。例如博客點贊次數自動+1的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一個version只有一次執行成功的機會,一旦失敗了生產者必須重新獲取數據的最新版本號再次發起更新。
2. 去重表:
利用數據庫表單的特性來實現冪等,常用的一個思路是在表上構建唯一性索引,保證某一類數據一旦執行完畢,后續同樣的請求不再重復處理了(利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。)
以電商平台為例子,電商平台上的訂單id就是最適合的token。當用戶下單時,會經歷多個環節,比如生成訂單,減庫存,減優惠券等等。每一個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操作並緩存結果,而對已經執行過的id,則直接返回之前的執行結果,不做任何操作。這樣可以在最大程度上避免操作的重復執行問題,緩存起來的執行結果也能用於事務的控制等。
(1)消費端弄丟了數據
唯一可能導致消費者弄丟數據的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經消費好了這個消息,其實你剛准備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
大家都知道kafka會自動提交offset,那么只要關閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數據不會丟。但是此時確實還是會重復消費,比如你剛處理完,還沒提交offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。
生產環境碰到的一個問題,就是說我們的kafka消費者消費到了數據之后是寫到一個內存的queue里先緩沖一下,結果有的時候,你剛把消息寫入內存queue,然后消費者會自動提交offset。
然后此時我們重啟了系統,就會導致內存queue里還沒來得及處理的數據就丟失了
(2)kafka弄丟了數據
這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些數據沒有同步,結果此時leader掛了,然后選舉某個follower成leader之后,他不就少了一些數據?這就丟了一些數據啊。
所以此時一般是要求起碼設置如下4個參數:
給這個topic設置replication.factor參數:這個值必須大於1,要求每個partition必須有至少2個副本。
在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確保leader掛了還有一個follower吧。
在producer端設置acks=all:這個是要求每條數據,必須是寫入所有replica之后,才能認為是寫成功了。
在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
(3)生產者會不會弄丟數據
如果按照上述的思路設置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
