1. Kafka的Producer
不論將kafka作為什么樣的用途,都少不了的向Broker發送數據或接受數據,Producer就是用於向Kafka發送數據。如下:

2. 添加依賴
pom.xml文件如下:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
3. 發送消息
3.1 創建生產者
創建生產者的時候,我們需要為生產者設置一些屬性,其中有三個必選屬性如下:
1. bootstrap.servers: 該屬性指定broker 的地址清單,地址的格式為host:po 忱。清單里不需要包含所有的broker 地址,生產者會給定的broker 里查找到其他broker 的信息。不過建議至少要提供兩個broker 的信息, 一且其中一個若機,生產者仍然能夠連接到集群上。
2. key.serializer: broker 希望接收到的消息的鍵和值都是字節數組。生產者接口允許使用參數化類型,因此可以把Java 對象作為鍵和值發送給broker 。這樣的代碼具有良好的可讀性,不過生產者需要知道如何把這些Java 對象轉換成字節數組。key. serializer必須被設置為一個實現了org.apache.kafka.common.serialization.StringSerializer接口的類,生產者會使用這個類把鍵對象序列化成字節數組。Kafka 客戶端默認提供了ByteArraySerializer(這個只做很少的事情)、StringSerializer和IntegeSerializer,因此,如果你只使用常見的幾種Java 對象類型,那么就沒必要實現自己的序列化器。要注意, key.serializer是必須設置的,就算你打算只發送值內容。
3. value.serializer: 與key.serializer一樣,value.serializer指定的類會將值序列化。如果鍵和值都是字符串,可以使用與key.serializer一樣的序列化器。如果鍵是整數類型而值是字符串,那么需要使用不同的序列化器。
設置屬性代碼如下:
Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value允許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
3.2 發送消息的三種方式
1. 並發並忘記,這是普通的消息發送方式,我們把消息發送給服務器,但井不關心它是否正常到達。大多數情況下,消息會正常到達,因為Kafka 是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些消息。
實現如下:
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value允許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); kafkaProducer.send(record); } }
此時在Kafka中打開內置的消費者消費消息,結果如下,命令如下:
kafka-console-consumer.sh --bootstrap-server 47.105.145.123:9092 --topic testTopic --from-beginning
然后,啟動生產者發送消息,結果如下:

這里啟動了四次消費者,所以有四條消息被消費。
3.3 同步發送消息
我們使用send () 方怯發送消息, 它會返回Future對象,調用get () 方法進行等待,就可以知道悄息是否發送成功。
實現方式如下:
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value允許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); //創建消息對象,第一個為參數topic,第二個參數為key,第三個參數為value ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); //同步發送方式,get方法返回結果 RecordMetadata metadata = (RecordMetadata) kafkaProducer.send(record).get(); System.out.println("broker返回消息發送信息" + metadata); } }
客戶端消費者仍能收到消息,且生產者也能收到返回結果,返回結果如下:

3.4 異步發送消息
假設消息在應用程序和Kafka 集群之間一個來回需要lOm s 。如果在發送完每個消息后都等待回應,那么發送100 個消息需要l秒。但如果只發送消息而不等待響應,那么發送100 個消息所需要的時間會少很多。大多數時候,我們並不需要等待響應一一盡管Kafka會把目標主題、分區信息和悄息的偏移量發送回來,但對於發送端的應用程序來說不是必需的。不過在遇到消息發送失敗時,我們需要拋出異常、記錄錯誤日志,或者把消息寫入“錯誤消息”文件以便日后分析。
實現如下:
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value允許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); //創建消息對象,第一個為參數topic,第二個參數為key,第三個參數為value final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); //異步發送消息。異常時打印異常信息或發送結果 kafkaProducer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println(e.getMessage()); } else { System.out.println("接收到返回結果:" + recordMetadata); } } }); //異步發送消息時必須要flush,否則發送不成功,不會執行回調函數 kafkaProducer.flush(); } }
監聽到的返回信息如下:

3.5 生產者的配置
生產者還有很多可以配置的參數,如下:
1. acks:指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。該參數有如下選項。
如果acks=0 , 生產者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現了問題, 導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
如果acks=1 ,只要集群的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節點(比如領導節點奔潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是異步發送。如果讓發送客戶端等待服務器的響應(通過調用Future對象的ge t ()方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生產者在收到服務器響應之前可以發送多少個消息)。
如果acks=all ,只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行。不過,它的延遲比acks=1時更高,因為我們要等待不只一個服務器節點接收消息。
2. buffer.memory: 用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send ()方法調用要么被阻塞,要么拋出異常,取決於如何設置block.on.buffer 參數(在0. 9.0.0 版本里被替換成了l'la x .block.l'ls ,表示在拋出異常之前可以阻塞一段時間)。
3. compression.type: 默認情況下,消息發送時不會被壓縮。該參數可以設置為snappy 、gzip 或lz4 ,它指定了消息被發到broker 之前使用哪一種壓縮算法進行壓縮。ssnappy壓縮算法由Google發明,它占用較少的CPU ,卻能提供較好的性能和相當可觀的 壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip壓縮算法一般會占用較多的CPU ,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向Kafka 發送消息的瓶頸所在。
4. retries: 生產者從服務器收到的錯誤有可能是臨時性的錯誤,在這種情況下, retries 參數的值決定了生產者可以重發消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤
5. batch.size: 當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里,該參數指定了一個批次可以使用的內存大小,按照字節數計算。
6. linger.ms: 該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。
7. client.id: 該參數可以是任意的字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里
8. max.in.flight.requests.per.connection: 該參數指定了生產者在收到服務器晌應之前可以發送多少個消息
9. timeout.ms 、request.timeout.ms 和metadata.fetch.timeout.ms:request.timeout.ms 指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發送數據,要么返回一個錯誤(拋出異常或執行回調)。timeout.ms指定了broker 等待同步副本返回消息確認的時間,與asks 的配置相匹配一一如果在指定時間內沒有收到同步副本的確認,那么broker 就會返回一個錯誤。
10. max.block.ms:該參數指定了在調用send () 方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。
11. max.request.size:該參數用於控制生產者發送的請求大小,可以指能發送的單個消息的最大值,也可以指單個請求里面所有消息總的大小。
12. receive.buffer.bytes 和send.buffer.bytes: 這兩個參數分別指定了TCP socket 接收和發送數據包的緩沖區大小,如果它們被設為-1,就使用操作系統的默認值。
