Kafka-序列化器與反序列化器的使用(自定義消息類型)
代碼如下
Customer
/** * @Author FengZhen * @Date 2020-03-30 22:49 * @Description 自定義序列化器的實體類 */ public class Customer { private int customerID; private String customerName; public Customer(int customerID, String customerName) { this.customerID = customerID; this.customerName = customerName; } public int getCustomerID() { return customerID; } public void setCustomerID(int customerID) { this.customerID = customerID; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } }
序列化器
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import java.nio.ByteBuffer; import java.util.Map; /** * @Author FengZhen * @Date 2020-03-30 22:49 * @Description 自定義序列化器:不建議使用,因為如果修改序列化器,就會出現新舊消息不兼容。 * 建議使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf */ public class CustomerSerializer implements Serializer<Customer> { @Override public void configure(Map<String, ?> configs, boolean isKey) { //不做任何配置 } /** * Customer對象被序列化成: * 表示customerID的4字節整數 * 表示customerName長度的4字節整數(如果customerName為空,則長度為0) * 表示customerName的N個字節 * @param topic * @param data * @return */ @Override public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (null == data){ return null; }else{ if (data.getCustomerName() != ""){ serializedName = data.getCustomerName().getBytes("UTF-8"); stringSize = serializedName.length; }else{ serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e){ throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { //不需要關閉任何東西 } }
反序列化器
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; import java.util.Map; /** * @Author FengZhen * @Date 2020-04-06 15:08 * @Description 自定義反序列化器 */ public class CustomerDeserializer implements Deserializer<Customer> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public Customer deserialize(String topic, byte[] data) { int id; int nameSize; String name; try { if (null == data){ return null; } if (data.length < 8){ throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected"); } ByteBuffer buffer = ByteBuffer.wrap(data); id = buffer.getInt(); nameSize = buffer.getInt(); byte[] nameBytes = new byte[nameSize]; buffer.get(nameBytes); name = new String(nameBytes, "UTF-8"); return new Customer(id, name); } catch (Exception e){ throw new SerializationException("Error when serializing Customer to byte[]" + e); } } @Override public void close() { } }
生產者發送消息
import com.chinaventure.kafka.serializer.Customer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * @Author FengZhen * @Date 2020-03-29 12:21 * @Description kafka生產者使用 */ public class KafkaProducerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void main(String[] args) { udfSerializer(); } /** * 自定義序列化器 */ public static void udfSerializer(){ kafkaProps.put("value.serializer", "com.chinaventure.kafka.serializer.CustomerSerializer"); KafkaProducer<String, Customer> producer = new KafkaProducer(kafkaProps); for (int i = 0; i < 10; i++){ ProducerRecord<String, Customer> record = new ProducerRecord<>("test_udf_serializer",i % 3 == 0 ? "Apple": "Banana"+i,new Customer(i, "我是" + i)); producer.send(record, new DemonProducerCallback()); } while (true){ try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者讀取數據
import com.chinaventure.kafka.serializer.Customer; import com.chinaventure.util.ExceptionUtil; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @Author FengZhen * @Date 2020-04-06 11:07 * @Description kafka消費者 */ public class KafkaConsumerTest { private static Properties kafkaProps = new Properties(); static { kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); private static KafkaConsumer<String, String> consumer; public static void main(String[] args) { udfDeserializer(); } /** * 自定義反序列化器 */ public static void udfDeserializer(){ kafkaProps.put("value.deserializer", "com.chinaventure.kafka.serializer.CustomerDeserializer"); KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(kafkaProps); //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。 //如:test.*,訂閱test相關的所有主題 consumer.subscribe(Collections.singleton("test_udf_serializer")); System.out.println("==== subscribe success ===="); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, Customer> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。 consumer.close(); } } }
生產者打印內容
topic:test_udf_serializer partition:0 offset:0 metaData:test_udf_serializer-0@0 topic:test_udf_serializer partition:0 offset:1 metaData:test_udf_serializer-0@1 topic:test_udf_serializer partition:0 offset:2 metaData:test_udf_serializer-0@2 topic:test_udf_serializer partition:0 offset:3 metaData:test_udf_serializer-0@3 topic:test_udf_serializer partition:0 offset:4 metaData:test_udf_serializer-0@4 topic:test_udf_serializer partition:0 offset:5 metaData:test_udf_serializer-0@5 topic:test_udf_serializer partition:0 offset:6 metaData:test_udf_serializer-0@6 topic:test_udf_serializer partition:0 offset:7 metaData:test_udf_serializer-0@7 topic:test_udf_serializer partition:0 offset:8 metaData:test_udf_serializer-0@8 topic:test_udf_serializer partition:0 offset:9 metaData:test_udf_serializer-0@9
消費者打印內容
topic=test_udf_serializer, partition=0, offset=0, key=Apple, value=com.chinaventure.kafka.serializer.Customer@63798ca7 topic=test_udf_serializer, partition=0, offset=1, key=Banana1, value=com.chinaventure.kafka.serializer.Customer@4612b856 topic=test_udf_serializer, partition=0, offset=2, key=Banana2, value=com.chinaventure.kafka.serializer.Customer@22875539 topic=test_udf_serializer, partition=0, offset=3, key=Apple, value=com.chinaventure.kafka.serializer.Customer@5674e1f2 topic=test_udf_serializer, partition=0, offset=4, key=Banana4, value=com.chinaventure.kafka.serializer.Customer@79c7532f topic=test_udf_serializer, partition=0, offset=5, key=Banana5, value=com.chinaventure.kafka.serializer.Customer@2a448449 topic=test_udf_serializer, partition=0, offset=6, key=Apple, value=com.chinaventure.kafka.serializer.Customer@32f232a5 topic=test_udf_serializer, partition=0, offset=7, key=Banana7, value=com.chinaventure.kafka.serializer.Customer@43f82e78 topic=test_udf_serializer, partition=0, offset=8, key=Banana8, value=com.chinaventure.kafka.serializer.Customer@e54303 topic=test_udf_serializer, partition=0, offset=9, key=Apple, value=com.chinaventure.kafka.serializer.Customer@e8df99a
Done.