1、Producer的攔截器interceptor,和consumer端的攔截器interceptor是在kafka0.10版本被引入的,主要用於實現clients端的定制化控制邏輯,生產者攔截器可以用在消息發送前做一些准備工作,使用場景,如下所示:
1)、按照某個規則過濾掉不符合要求的消息。
2)、修改消息的內容。
3)、統計類需求。
1 package com.demo.kafka.listener; 2 3 import java.util.Map; 4 5 import org.apache.kafka.clients.producer.ProducerInterceptor; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 import org.apache.kafka.clients.producer.RecordMetadata; 8 9 /** 10 * 生產者攔截器 11 * 12 * @author 生產者攔截器 13 * 14 */ 15 16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> { 17 18 // 發送成功計數 19 private volatile long sendSuccess = 0; 20 21 // 發送失敗計數 22 private volatile long sendFailure = 0; 23 24 /** 25 * 26 */ 27 @Override 28 public void configure(Map<String, ?> configs) { 29 30 } 31 32 /** 33 * 發送消息已經操作消息的方法 34 */ 35 @Override 36 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { 37 String modifiedValue = "前綴prefix : " + record.value(); 38 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( 39 record.topic(), // 主題 40 record.partition(), // 分區 41 record.timestamp(), // 時間戳 42 record.key(), // key值 43 modifiedValue, // value值 44 record.headers()); // 消息頭 45 return producerRecord; 46 } 47 48 /** 49 * ack確認的方法 50 */ 51 @Override 52 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { 53 if(exception == null) { 54 sendSuccess++; 55 }else { 56 sendFailure++; 57 } 58 } 59 60 /** 61 * 關閉的方法,發送成功之后會將攔截器關閉,調用此方法 62 */ 63 @Override 64 public void close() { 65 double successRation = (double)sendSuccess / (sendSuccess + sendFailure); 66 System.out.println("【INFO 】 發送成功率: " + String.format("%f", successRation * 100) + "%"); 67 } 68 69 }
生產者客戶端要配置一下Producer的攔截器interceptor,如下所示:
1 package com.demo.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.ProducerConfig; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata; 10 import org.apache.kafka.common.serialization.StringSerializer; 11 12 import com.demo.kafka.listener.ProducerInterceptorPrefix; 13 14 public class KafkaProducerSimple { 15 16 // 設置服務器地址 17 private static final String brokerList = "192.168.110.142:9092"; 18 19 // 設置主題 20 private static final String topic = "topic-demo"; 21 22 public static void main(String[] args) { 23 Properties properties = new Properties(); 24 // 設置key的序列化器 25 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 26 27 // 設置重試次數 28 properties.put(ProducerConfig.RETRIES_CONFIG, 10); 29 30 // 設置值的序列化器 31 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 32 33 // 打印輸出序列化器的路徑信息 34 System.err.println(StringSerializer.class.getName()); 35 36 // 設置集群地址 37 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 38 39 // 自定義攔截器使用,可以計算發送成功率或者失敗率,進行消息的拼接或者過濾操作 40 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName()); 41 42 // 將參數配置到生產者對象中 43 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); 44 45 for (int i = 0; i < 100000; i++) { 46 // 生產者消息記錄 47 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i); 48 // 同步獲取消息 49 // RecordMetadata recordMetadata = producer.send(record).get(); 50 producer.send(record); 51 } 52 53 // 關閉 54 producer.close(); 55 } 56 57 }
消費者代碼,如下所示:
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Collections; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.ConsumerConfig; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 import org.apache.kafka.clients.producer.ProducerConfig; 12 import org.apache.kafka.common.serialization.StringDeserializer; 13 14 public class KafkaConsumerSimple { 15 16 // 設置服務器地址 17 private static final String bootstrapServer = "192.168.110.142:9092"; 18 19 // 設置主題 20 private static final String topic = "topic-demo"; 21 22 // 設置消費者組 23 private static final String groupId = "group.demo"; 24 25 public static void main(String[] args) { 26 Properties properties = new Properties(); 27 // 設置反序列化key參數信息 28 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 29 // 設置反序列化value參數信息 30 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 31 32 // 設置服務器列表信息 33 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 34 35 // 設置消費者組信息 36 properties.put("group.id", groupId); 37 38 // 將參數設置到消費者參數中 39 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 40 41 // 消息訂閱 42 consumer.subscribe(Collections.singletonList(topic)); 43 44 while (true) { 45 // 每隔一秒監聽一次 46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 47 // 獲取到消息信息 48 for (ConsumerRecord<String, String> record : records) { 49 System.err.println(record.toString()); 50 } 51 } 52 53 } 54 55 }
2、生產者的acks參數,這個參數用來指定分區中必須有多少副本來收到這條消息,之后生產者才會認為這條消息寫入成功的。acks是生產者客戶端中非常重要的一個參數,它涉及到消息的可靠性和吞吐量之間的權衡。
1)、ack等於0,生產者在成功寫入消息之前不會等待任何來自服務器的響應。如果出現問題生產者是感知不到的,消息就丟失了,不過因為生產者不需要等待服務器響應,所以他可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
2)、acks等於1,默認值為1,只要集群的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法達到首領節點,比如首領節點崩潰,新的首領節點還沒有被選舉出來,生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。但是這樣還有可能會導致數據丟失,如果收到寫成功通知,此時首領節點還沒有來的及同步數據到follower節點,首領節點崩潰,就會導致數據丟失。
3)、acks等於-1,只有當所有參與復制的節點收到消息時候,生產者會收到一個來自服務器額成功響應,這種模式 最安全的,他可以保證不止一個服務器收到消息。
注意,acks參數配置的是一個字符串類型,而不是整數類型,如果配置為整數類型會拋出異常信息。
3、kafka消費者訂閱主題和分區,創建完消費者后我們便可以訂閱主題了,只需要調用subscribe方法即可,這個方法會接受一個主題列表,如下所示:
另外,我們也可以使用正則表達式來匹配多個主題,而且訂閱之后如果又有匹配的新主題,那么這個消費組立即對其進行消費。正則表達式在連接kafka與其他系統非常有用。比如訂閱所有的測試主題。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Collections; 6 import java.util.Properties; 7 import java.util.regex.Pattern; 8 9 import org.apache.kafka.clients.consumer.ConsumerConfig; 10 import org.apache.kafka.clients.consumer.ConsumerRecord; 11 import org.apache.kafka.clients.consumer.ConsumerRecords; 12 import org.apache.kafka.clients.consumer.KafkaConsumer; 13 import org.apache.kafka.clients.producer.ProducerConfig; 14 import org.apache.kafka.common.TopicPartition; 15 import org.apache.kafka.common.serialization.StringDeserializer; 16 17 public class KafkaConsumerSimple { 18 19 // 設置服務器地址 20 private static final String bootstrapServer = "192.168.110.142:9092"; 21 22 // 設置主題 23 private static final String topic = "topic-demo"; 24 25 // 設置主題 26 private static final String topic2 = "topic-demo2"; 27 28 // 設置消費者組 29 private static final String groupId = "group.demo"; 30 31 public static void main(String[] args) { 32 Properties properties = new Properties(); 33 // 設置反序列化key參數信息 34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 35 // 設置反序列化value參數信息 36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 37 38 // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個 39 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 40 41 // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱 42 properties.put("group.id", groupId); 43 44 // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。 45 properties.put("client.id", "consumer.client.id.demo"); 46 47 // 將參數設置到消費者參數中 48 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 49 50 // 消息訂閱 51 consumer.subscribe(Collections.singletonList(topic)); 52 // 可以訂閱多個主題 53 consumer.subscribe(Arrays.asList(topic, topic2)); 54 // 可以使用正則表達式進行訂閱 55 consumer.subscribe(Pattern.compile("topic-demo*")); 56 57 // 指定訂閱的分區 58 consumer.assign(Arrays.asList(new TopicPartition(topic, 0))); 59 60 while (true) { 61 // 每隔一秒監聽一次 62 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 63 // 獲取到消息信息 64 for (ConsumerRecord<String, String> record : records) { 65 System.err.println(record.toString()); 66 } 67 } 68 69 } 70 71 }