轉載請注明出處:
2.1Kafka生產者客戶端架構
2.2 Kafka 進行消息生產發送代碼示例及ProducerRecord對象
kafka進行消息生產發送代碼示例:
public class KafkaProducerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static Properties initConfig() ( Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties. put ("client. id", "producer. client. id. demo"); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String,String> record = new ProducerRecord<> (topic, "hello, Kafka1 "); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } } }
構建的消息對象ProducerRecord, 它並不是單純意義上的消息,它包含了多個屬性,原本需要發送的與業務相關的消息體只是其中的一個value屬性,比如"Hello, Kafka!"只是ProducerRecord對象中的一個屬性。 ProducerRecord類的定義如下:
public class ProducerRecord<K, V> { private final String topic; //主題 private final Integer partition; //分區號 private final Headers headers; //消息頭部 private final K key; //鍵 private final V value; //值 private final Long timestamp; //消息的時間戳 //省略其他成員方法和構造方法 }
其中topic和 partition字段分別代表消息要發往的主題和分區號。headers字段是消息的頭部,它大多用來設定 一些與應用相關的信息,如無需要也可以不用設置。key是用來指定消息的鍵, 它不僅是消息的附加信息,還可以用來計算分區號進而可以讓消息發往特定的分區。
key可以讓消息再進行二次歸類, 同 一個key的消息會被划分到同 一個分區中, 有key的消息還可以支持日志壓縮的功能,value是指消息體,一般不為空,如果為空則表示特定的消息 一墓碑消息;timestamp是指消息的時間戳,它有 CreateTime 和 LogAppendTime 兩種類型,前者表示消息創建的時間,后者表示消息追加到日志文件的時間.
KafkaProducer是線程安全的, 可以在多個線程中共享單個KafkaProducer實例,也 可以將KafkaProducer實例進行池化來供其他線程調用。
2.3 發送消息的三種模式及實現區別
發送消息主要有三種模式: 發后即忘(fire-and-forget)、同步(sync)及異步Casync)。
發后即忘,它只管往Kafka中發送消息而並不關心消息是否正確到達。 在大多數情況下,這種發送方式沒有什么問題,不過在某些時候(比如發生不可重試異常時)會造成消息的丟失。 這種發送方式的性能最高, 可靠性也最差。
KafkaProducer的 send()方法並非是void類型, 而是Future<RecordMetadata>類型, send()方法有2個重載方法,具體定義如下:
public Future<RecordMetadata> send(ProducerRecord<K, V> record) public Future<RecordMetadata> send(ProducerRecord<K, V> record,Callback callback)
實現同步的發送方式, 可以利用返回的 Future 對象實現:
try { producer.send(record) .get(); } catch (ExecutionException I InterruptedException e) { e.printStackTrace(); }
send()方法本身就是異步的,send()方法返回的Future對象可以使調用方稍后獲得發送的結果。 示例中在執行send()方法之后直接鏈式調用了get()方法來阻塞等待Kaflca的響應,直到消息發送成功, 或者發生異常。 如果發生異常,那么就需要捕獲異常並交由外層邏輯處理。
try { Future<RecordMetadata> future = producer.send{record); RecordMetadata metadata= future.get(); System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset()); } catch (ExecutionException I InterruptedException e) { e.printStackTrace () ; }
這樣可以獲取一個RecordMetadata對象, 在RecordMetadata對象里包含了消息的一些元數據信息,比如當前消息的主題、分區號、分區中的偏移量(offset)、 時間戳等。
2.4 序列化
生產者需要用序列化器(Serializer)把對象轉換成字節數組才能通過網絡發送給Kafka。 而在對側, 消費者需要用反序列化器(Deserializer)把從Kafka 中收到的字節數組轉換成相應的對象。
為 了方便, 消息的key和value都使用了字符串, 對應程序中的序列化器也使用了客戶端自帶的org.apache.kafka. common. serialization. StringSerializer, 除了用於String 類型的序列化器,還有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long這幾種類型, 它們都實現了org.apache.kafka. common. serialization. Serializer接口
2.5 分區器
消息在通過send( )方法發往broker 的過程中,有可能需要經過攔截器(Interceptor)、 序列化器(Serializer)和分區器(Parttitioner)的一系列作用之后才能被真正地發往 broker。攔截器一般不是必需的, 而序列化器是必需的。 消息 經過 序列化 之后就需要確定它發往的分區 ,如果消息ProducerRecord中指定了 partitition字段, 那么就不需要分區器的作用, 因為partition代表的就是所要發往的分區號。
如果消息ProducerRecord中沒有 指定partition字段,那么就需要依賴分區器,根據key這個字段來計算partition的值。 分區器的作用 就是為消息 分配分區。
Kafka 中提供的默認分區器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它實現了org.apache.kafka.clients.producer.Partitioner 接口, 這個接口中定義了2個方法, 具體如下所示。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); public void close();
其中 partition()方法用來計算分區號,返回值為 int 類型。partition()方法中的參數分別表示主題 、鍵、序列化后的鍵、值、序列化后的值,以及集群的元數據信息,通過這些信息可以實現功能豐富的分區器。 close()方法在關閉分區器的時候用來回收一些資源 。
默認的分區器會對key 進行哈希(采用MurmurHash2 算法 ,具備高運算性能及低碰撞率),最終根據得到 的 哈希值來計算分區號, 擁有相同 key 的消息會被寫入同一個分區 。 如果 key 為 null ,那么消息將會以輪詢的方式發往主題內的各個可用分區。
2.6 攔截器
生產者攔截器既可以用 來在消息發送前做一些准備工作 ,比如按照某個規則過濾不符合要求的消 息、修改消 息的內容等,也可以用來在發送回調邏輯前做一些定制化的需求,比如統計類工作。
生產者攔截器 的 使用 也 很方便,主要是自定義實現org .apache.kafka. clients. producer.Producerlnterceptor 接口。ProducerInterceptor 接 口中包含 3 個方法 :
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Excepti on exception ); public void close() ;
KafkaProducer 在將消息序列化和計算分區之前會調用 生產者攔截器 的 onSend()方法來對消息進行相應 的定制化操作。KafkaProducer 會在消息被應答( Acknowledgement )之前或消息發送失敗時調用生產者攔截器的onAcknowledgement()方法,優先於用戶設定的Callback 之前執行。
2.6 消息累加器
整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender 線程 (發送線程)。在主線程中由 KafkaProducer 創建消息,然后通過可能的攔截器、序列化器和分區器的作用之后緩存到消息累加器( RecordAccumulator,也稱為消息收集器〉中。Sender 線程負責從RecordAccumulator 中 獲取消息並將其發送到 Kafka 中 。
RecordAccumulator 主要用來緩存消息 以便Sender 線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能 。RecordAccumulator 緩存的大 小可以通過生產者客戶端參數buffer. memory 配置,默認值為 33554432B ,即32MB 。 如果生產者發送消息的速度超過發送到服務器的速度 ,則會導致生產者空間不足,這個時候 KafkaProducer 的 send()方法調用要么被阻塞,要么拋出異常,這個取決於參數 max. block . ms 的配置,此參數的默認值為 6 0000,即 60 秒 。
Sender 從RecordAccumulator 中 獲取緩存的消息之后,會進一 步將原本<分區,Deque<Producer Batch>>的保存形式轉變成<Node , List< ProducerBatch>的形式,其中 Node 表示 Kafka集群的 broker 節點 。對於網絡連接來說,生產者客戶端是與具體 的 broker 節點建立的連接,也就是 向具體的broker 節點發送消息,而並不關心消息屬於哪一個分區;而對於KafkaProducer的應用邏輯而言 ,我們只 關注向哪個分區中發送哪些消息,所 以在這里需要做一個應用邏輯層面到網絡 1/0 層面的轉換。
2.7 重要的生產者參數
1.acks
這個參數用來指定分區中必須要有多少個副本收到這條消息,之后生產者才會認為這條消息是成功寫入的。acks 是生產者客戶端中一個非常重要的參數,它涉及消息的可靠性和吞吐量之間的權衡。 acks 參數有 3 種類型的值(都是字符串類型)。
acks =1 : 默認值即為l 。生產者發送消息之后,只要分區的leader 副本成功寫入消息,那么它就會收到來自服務端的成功響應 。 如果消息無法寫入 leader 副本,比如在leader 副本崩潰、重新選舉新的leader 副本的過程中,那么生產者就會收到一個錯誤的響應,為了避免消息丟失,生產者可以選擇重發消息 。如果消息寫入 leader 副本並返回成功響應給生產者,且在被其他 follower 副本拉取之前 leader 副本崩潰,那么此時消息還是會丟失,因為新選舉的 leader 副本中並沒有這條對應的消息 。 acks 設置為l ,是消息可靠性和吞吐量之間的折中方案。
acks = 0 :生產者發送消 息之后不需要等待任何服務端的響應。如果在消息從發送到寫入 Kafka 的過程中出現某些異常,導致 Kafka 並沒有收到這條消息,那么生產者也無從得知,消息也就丟失了。在其他配置環境相同的情況下,acks 設置為 0 可以達到最大的吞吐量。
acks =- l 或 acks =all : 生產者在消 息發送之后,需要等待 ISR 中的所有副本都成功寫入消息之后才能夠收到來自服務端的成功響應。在其他配置環境相同的情況下,acks 設置為-1(all )可以達到最強的可靠性。但這並不意味着消息就一定可靠,因為 ISR 中可能只有 leader 副本,這樣就退化成了 acks= l 的情況。
2.max.request.size
這個參數用來限制生產者客戶端能發送的消息的最大值,默認值為1048576B ,即lMB 。一般情況下,這個默認值就可以滿足大多數的應用場景了。
3.retries 和 retry. backoff.ms
retries 參數用來配置生產者重試的次數,默認值為 0,即在發生異常的時候不進行任何重試動作。消息在從生產者發出到成功寫入服務器之前可能發生一些臨時性的異常,比如網絡抖動、leader 副本的選舉等,這種異常往往是可以自行恢復的,生產者可以通過配置 retries大於 0 的值,以此通過 內 部重試來恢復而不是一昧地將異常拋給生產者的應用程序。 如果重試達到設定的次數 ,那么生產者就會放棄重試並返回異常。
不過並不是所有的異常都是可以通過重試來解決的,比如消息太大,超過 max.request.size 參數配置的值時,這種方式就不可行了。 重試還和另一個參數 retry.backoff.ms 有關,這個參數的默認值為100 ,它用來設定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置 retries 和retry.backoff.ms之前,最好先估算一下可能的異常恢復時間,這樣可以設定總的重試時間大於這個異常恢復時間,以此來避免生產者過早地放棄重試 。
4.compression.type
這個參數用來指定消息的壓縮方式,默認值為“ none ”,即默認情況下,消息不會被壓縮。該參數還可以配置為“ gzip ”,“ snappy ” 和“ lz4 ”。 對消息進行壓縮可以極大地減少網絡傳輸量 、降低網絡 IO ,從而提高整體的性能 。消息壓縮是一種使用時間換空間的優化方式,如果對時延有一定的要求,則不推薦對消息進行壓縮 。
5. request.timeout.ms
這個參數用來配置 Producer 等待請求響應的最長時間,默認值為 3 0000( ms )。請求超時之后可以選擇進行重試。注意這個參數需要 比 broker 端參數 replica.lag.time.max.ms 的值要大 ,這樣可以減少因客戶端重試而引起的消息重復的概率。