Kafka-序列化器與反序列化器的使用(自定義消息類型)


Kafka-序列化器與反序列化器的使用(自定義消息類型)

代碼如下

Customer

/**
 * @Author FengZhen
 * @Date 2020-03-30 22:49
 * @Description 自定義序列化器的實體類
 */
public class Customer {
    private int customerID;
    private String customerName;

    public Customer(int customerID, String customerName) {
        this.customerID = customerID;
        this.customerName = customerName;
    }

    public int getCustomerID() {
        return customerID;
    }

    public void setCustomerID(int customerID) {
        this.customerID = customerID;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }
}

序列化器

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @Author FengZhen
 * @Date 2020-03-30 22:49
 * @Description 自定義序列化器:不建議使用,因為如果修改序列化器,就會出現新舊消息不兼容。
 * 建議使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf
 */
public class CustomerSerializer implements Serializer<Customer> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        //不做任何配置
    }

    /**
     * Customer對象被序列化成:
     * 表示customerID的4字節整數
     * 表示customerName長度的4字節整數(如果customerName為空,則長度為0)
     * 表示customerName的N個字節
     * @param topic
     * @param data
     * @return
     */
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (null == data){
                return null;
            }else{
                if (data.getCustomerName() != ""){
                    serializedName = data.getCustomerName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                }else{
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e){
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }

    @Override
    public void close() {
        //不需要關閉任何東西
    }
}

反序列化器

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @Author FengZhen
 * @Date 2020-04-06 15:08
 * @Description 自定義反序列化器
 */
public class CustomerDeserializer implements Deserializer<Customer> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int id;
        int nameSize;
        String name;
        try {
            if (null == data){
                return null;
            }
            if (data.length < 8){
                throw  new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameBytes = new byte[nameSize];
            buffer.get(nameBytes);
            name = new String(nameBytes, "UTF-8");
            return new Customer(id, name);
        } catch (Exception e){
            throw new SerializationException("Error when serializing Customer to byte[]" + e);
        }
    }

    @Override
    public void close() {

    }
}

生產者發送消息

import com.chinaventure.kafka.serializer.Customer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * @Author FengZhen
 * @Date 2020-03-29 12:21
 * @Description kafka生產者使用
 */
public class KafkaProducerTest {

    private static Properties kafkaProps = new Properties();
    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    public static void main(String[] args) {
        udfSerializer();
    }

    /**
     * 自定義序列化器
     */
    public static void udfSerializer(){
        kafkaProps.put("value.serializer", "com.chinaventure.kafka.serializer.CustomerSerializer");
        KafkaProducer<String, Customer> producer = new KafkaProducer(kafkaProps);
        for (int i = 0; i < 10; i++){
            ProducerRecord<String, Customer> record = new ProducerRecord<>("test_udf_serializer",i % 3 == 0 ? "Apple": "Banana"+i,new Customer(i, "我是" + i));
            producer.send(record, new DemonProducerCallback());
        }
        while (true){
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

消費者讀取數據

import com.chinaventure.kafka.serializer.Customer;
import com.chinaventure.util.ExceptionUtil;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
 * @Author FengZhen
 * @Date 2020-04-06 11:07
 * @Description kafka消費者
 */
public class KafkaConsumerTest {
    private static Properties kafkaProps = new Properties();
    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    private static KafkaConsumer<String, String> consumer;

    public static void main(String[] args) {
        udfDeserializer();
    }

    /**
     * 自定義反序列化器
     */
    public static void udfDeserializer(){
        kafkaProps.put("value.deserializer", "com.chinaventure.kafka.serializer.CustomerDeserializer");
        KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(kafkaProps);
        //訂閱主題,可傳入一個主題列表,也可以是正則表達式,如果有人創建了與正則表達式匹配的新主題,會立即觸發一次再均衡,消費者就可以讀取新添加的主題。
        //如:test.*,訂閱test相關的所有主題
        consumer.subscribe(Collections.singleton("test_udf_serializer"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。
                //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞)
                //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據
                //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。
                ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, Customer> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出應用前使用close方法關閉消費者。
            //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致政哥群組在一段時間內無法讀取消息。
            consumer.close();
        }
    }
}

 

生產者打印內容

topic:test_udf_serializer
partition:0
offset:0
metaData:test_udf_serializer-0@0
topic:test_udf_serializer
partition:0
offset:1
metaData:test_udf_serializer-0@1
topic:test_udf_serializer
partition:0
offset:2
metaData:test_udf_serializer-0@2
topic:test_udf_serializer
partition:0
offset:3
metaData:test_udf_serializer-0@3
topic:test_udf_serializer
partition:0
offset:4
metaData:test_udf_serializer-0@4
topic:test_udf_serializer
partition:0
offset:5
metaData:test_udf_serializer-0@5
topic:test_udf_serializer
partition:0
offset:6
metaData:test_udf_serializer-0@6
topic:test_udf_serializer
partition:0
offset:7
metaData:test_udf_serializer-0@7
topic:test_udf_serializer
partition:0
offset:8
metaData:test_udf_serializer-0@8
topic:test_udf_serializer
partition:0
offset:9
metaData:test_udf_serializer-0@9

消費者打印內容

topic=test_udf_serializer, partition=0, offset=0, key=Apple, value=com.chinaventure.kafka.serializer.Customer@63798ca7
topic=test_udf_serializer, partition=0, offset=1, key=Banana1, value=com.chinaventure.kafka.serializer.Customer@4612b856
topic=test_udf_serializer, partition=0, offset=2, key=Banana2, value=com.chinaventure.kafka.serializer.Customer@22875539
topic=test_udf_serializer, partition=0, offset=3, key=Apple, value=com.chinaventure.kafka.serializer.Customer@5674e1f2
topic=test_udf_serializer, partition=0, offset=4, key=Banana4, value=com.chinaventure.kafka.serializer.Customer@79c7532f
topic=test_udf_serializer, partition=0, offset=5, key=Banana5, value=com.chinaventure.kafka.serializer.Customer@2a448449
topic=test_udf_serializer, partition=0, offset=6, key=Apple, value=com.chinaventure.kafka.serializer.Customer@32f232a5
topic=test_udf_serializer, partition=0, offset=7, key=Banana7, value=com.chinaventure.kafka.serializer.Customer@43f82e78
topic=test_udf_serializer, partition=0, offset=8, key=Banana8, value=com.chinaventure.kafka.serializer.Customer@e54303
topic=test_udf_serializer, partition=0, offset=9, key=Apple, value=com.chinaventure.kafka.serializer.Customer@e8df99a

Done.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM