Kafka-生產者
生產者發送消息流程
1.新建ProducerRecord對象,包含目標主題和要發送的內容。也可以指定鍵或分區
2.發送ProducerRecord對象時,生產者要把鍵和值對象序列化成字節數組,這樣它們才能在網絡上傳輸
3.數據被傳給分區器。
如果ProducerRecord對象中指定了分區,那么分區器就不會再做任何事情,直接把指定的分區返回。
如果沒有指定分區,那么分區器會根據ProducerRecord對象的鍵來選擇一個分區。
選擇好分區后,生產者就知道該往哪個主題和分區發送這條記錄了。
4.這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。
有一個獨立的線程負責把這些記錄批次發送到相應的broker上。
5.服務器在收到這些消息時會返回一個相應。
如果消息成功寫入kafka,就返回一個RecordMetaData對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。
如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之后會嘗試重新發送消息,幾次之后如果還是失敗,就返回錯誤信息。
創建生產者
kafka生產者有3個必選的屬性
bootstrap.servers
改屬性指定broker的地址清單,地址的格式為host:port。清單里不需要包含所有的broker地址,生產者會從給定的broker里查找到其它broker的信息。不過建議至少要提供兩個broker的信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。
key.serializer
broker希望接收到的消息的鍵和值都是字節數組。key.serializer必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer接口的類,生產者會使用這個類把鍵對象序列化成字節數組。
kafka客戶端默認提供了ByteArraySerializer、StringSerializer、IntegerSerializer,因此,如果只使用常見的Java對象類型,就沒有必要實現自己的序列化器。
value.serializer
與key.serializer一樣,value.serializer指定的類會將值序列化。如果鍵和值都是字符串,可以使用與key.serializer一樣的序列化器。
發送消息主要有三種方式
1.發送並忘記(fire-and-forget)
把消息發送給服務器,但並不關心它是否正常到達。
2.同步發送
使用send()方法發送消息,它會返回一個Future對象,調用get()方法進行等待,就可以知道消息是否發送成功
3.異步發送
調用send()方法,並指定一個回調函數,服務器在返回響應時調用該函數
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * @Author FengZhen * @Date 2020-03-29 12:21 * @Description kafka生產者使用 */ public class KafkaProducerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("test","message_key","message_value"); // simpleSend(producer, record); // sync(producer, record); aync(producer, record); } /** * 最簡單的方式發送,不管消息是否正常到達 * @param producer */ public static void simpleSend(KafkaProducer producer, ProducerRecord record){ try { producer.send(record); } catch(Exception e){ e.printStackTrace(); } } /** * 同步發送 * @param producer * @param record */ public static void sync(KafkaProducer producer, ProducerRecord record){ try { RecordMetadata recordMetadata = (RecordMetadata) producer.send(record).get(); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); System.out.println("metaData:" + recordMetadata.toString()); } catch(Exception e){ e.printStackTrace(); } } /** * 異步發送 * @param producer * @param record */ public static void aync(KafkaProducer producer, ProducerRecord record){ try { producer.send(record, new DemonProducerCallback()); while (true){ Thread.sleep(10 * 1000); } } catch(Exception e){ e.printStackTrace(); } } private static class DemonProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e){ e.printStackTrace(); }else{ System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); System.out.println("metaData:" + recordMetadata.toString()); } } } }
輸出如下
topic:test partition:0 offset:2 metaData:test-0@2
生產者的配置
1.acks
acks參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。這個參數消息丟失的可能性有重要影響。
如果acks=0,生產者在成功寫入消息之前不會等待任何來自服務器的響應。如果當中出現了問題,導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
如果acks=1,只要集群的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是異步發送。如果發送客戶端等待服務器的相應,顯然會增加延遲。如果客戶端是使用異步回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生產者在收到服務器響應之前可以發送多少個消息)
如果acks=all,只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行。不過,它的延遲會更高,因為需要等待不止一個服務器節點接收消息。
2.buffer.memory
該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send()方法調用要么被阻塞,要么拋出異常,取決於如何設置max.block.ms參數,此參數設置拋出異常之前可以阻塞的一段時間
3.compression.type
默認情況下,消息發送時不會被壓縮。該參數可以設置為snappy、gzip或lz4,它指定了消息被發送給broker之前使用哪一種壓縮算法進行壓縮。snappy壓縮算法由Google發明,它占用較少的CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip壓縮算法一般會占用較多的CPU,但會提供更高的壓縮比,如果網絡帶寬有限,可以使用這種算法。
使用壓縮可以降低網絡傳輸開銷和存儲開銷,這也是kafka的瓶頸所在。
4.retries
生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。這種情況下,retries參數的值決定了生產者可以重發消息的次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者會在每次重試之間等待100ms,不過可以通過retry.backoff.ms參數來改變這個時間間隔。
5.batch.size
當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者並不一定都會等到批次被填滿才發送,半滿的批次,甚至只包含一個消息的批次也有可能被發送。所以就算把批次大小設置的很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置的很小,生產者會更頻繁的發送消息,會增加一些額外的開銷。
6.linger.ms
該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。默認情況下,只要有可用的線程,就算批次里只有一個消息,生產者也會把消息發送出去。把此值設置成比0大的數,讓生產者在發送批次之前等待一會,使更多的消息加入這個批次。雖然這樣會增加延遲,但也會提升吞吐量。
7.client.id
可以是任意的字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里。
8.max.in.flight.requests.per.connection
該參數指定了生產者在收到服務器響應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為1可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。
9.request.timeout.ms
指定了生產者在發送數據時等待服務器返回響應的時間
10.metadata.fetch.timeout.ms
指定了生產者在獲取元數據時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發送數據,要么返回一個錯誤。
11.timeout.ms
指定了broker等待同步副本返回消息確認的時間,與asks的配置相匹配--如果在指定時間內沒有收到同步副本的確認,那么broker就會返回一個錯誤。
12.max.block.ms
指定了在調用send()方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到此值時,生產者會拋出超時異常。
13.max.request.size
用於設置生產者發送的請求大小。可以指能發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。
broker對可接收的消息最大值也有自己的限制,message.max.bytes,兩邊的配置最好可以匹配,避免生產者發送的消息被broker拒絕
14.receive.buffer.bytes和send.buffer.bytes
分別指定了TCP socket接收和發送數據包的緩沖區大小。如果被設置為-1,就是用操作系統默認值。如果生產者或消費者與broker處於不同的數據中細膩,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
順序保證
kafka可以保證同一個分區里的消息是有序的。生產者按照一定的順序發送消息,broker就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。
如果應用場景要求消息是有序的,可以把max.in.flight.requests.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其它的消息發送給broker。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做。