Kafka-序列化器


Kafka-序列化器

自定義序列化器

不建議使用自定義序列化器,因為如果序列化器需要新增字段,則會出現新舊消息不兼容問題。推薦使用已知的序列化器和反序列化器,如JSONAvroThriftProtobuf.

 

/**
 * @Author FengZhen
 * @Date 2020-03-30 22:49
 * @Description 自定義序列化器的實體類
 */
public class Customer {
    private int customerID;
    private String customerName;

    public Customer(int customerID, String customerName) {
        this.customerID = customerID;
        this.customerName = customerName;
    }

    public int getCustomerID() {
        return customerID;
    }

    public void setCustomerID(int customerID) {
        this.customerID = customerID;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }
}

 

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @Author FengZhen
 * @Date 2020-03-30 22:49
 * @Description 自定義序列化器:不建議使用,因為如果修改序列化器,就會出現新舊消息不兼容。
 * 建議使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf
 */
public class CustomerSerializer implements Serializer<Customer> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        //不做任何配置
    }

    /**
     * Customer對象被序列化成:
     * 表示customerID的4字節整數
     * 表示customerName長度的4字節整數(如果customerName為空,則長度為0)
     * 表示customerName的N個字節
     * @param topic
     * @param data
     * @return
     */
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (null == data){
                return null;
            }else{
                if (data.getCustomerName() != ""){
                    serializedName = data.getCustomerName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                }else{
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e){
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }

    @Override
    public void close() {
        //不需要關閉任何東西
    }
}

 

使用Avro序列化

Avro的數據文件里包含了整個schema,不過這樣的開銷是可接受的。但是如果在每條kafka記錄里都嵌入schema,會讓記錄的大小成倍的增加。在讀取記錄時仍然需要用到整個schema。使用schema注冊表實現。

schema注冊表並不屬於kafka,現在有一些開源的schema注冊表實現,如Confluent Schema Registry

我們把所有寫入數據需要用到的schema保存在注冊表里,然后在記錄里引用schema的標識符。負責讀取數據的應用程序使用標識符從注冊表里拉取schema來反序列化記錄。序列化器和反序列化器分別負責處理schema的注冊和拉取。

 

 

 

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @Author FengZhen
 * @Date 2020-03-30 23:06
 * @Description Avro序列化器
 *
 */
public class AvroSerializerTest {
    public static void main(String[] args) {

    }

    /**
     * 一般的Avro對象
     * {
     *     " namespace": " customerManagement . avro",
     *     "type": "record",
     *     "name": "Customer",
     *     "fields": [{
     *             "name": "id",
     *             "type": "int"
     *                },
     *        {
     *             "name": "name",
     *             "type": "string"
     *        },
     *        {
     *             "name": "email",
     *             "type": ["null", "string"],
     *             "default": "null"
     *        }
     *     ]
     * }
     */
    public static void genericValue(){
        String schemaUrl = "";

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
        properties.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
        //schema注冊表URI
        properties.put("schema.registry.url", schemaUrl);

        String schemaString = "{\n" +
                "\t\" namespace\": \" customerManagement . avro\",\n" +
                "\t\"type\": \"record\",\n" +
                "\t\"name\": \"Customer\",\n" +
                "\t\"fields\": [{\n" +
                "\t\t\t\"name\": \"id\",\n" +
                "\t\t\t\"type\": \"int\"\n" +
                "\t\t},\n" +
                "\t\t{\n" +
                "\t\t\t\"name\": \"name\",\n" +
                "\t\t\t\"type\": \"string\"\n" +
                "\t\t},\n" +
                "\t\t{\n" +
                "\t\t\t\"name\": \"email\",\n" +
                "\t\t\t\"type\": [\"null\", \"string\"],\n" +
                "\t\t\t\"default\": \"null\"\n" +
                "\t\t}\n" +
                "\t]\n" +
                "}";

        String topic = "customerContacts";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaString);

        int i = 0;
        while (true){
            i++;
            String name = "example:" + i;
            String email = "email:" + i;
            GenericRecord genericRecord = new GenericData.Record(schema);
            genericRecord.put("id", i);
            genericRecord.put("name", name);
            genericRecord.put("email", email);

            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, name, genericRecord);
            producer.send(record);
        }
    }

    /**
     * 用戶自定義的Avro對象
     */
    public static void udfValue(){
        String schemaUrl = "";

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
        properties.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
        //schema注冊表URI
        properties.put("schema.registry.url", schemaUrl);

        String topic = "customerContacts";

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);
        int i = 0;
        while (true){
            Customer customer = new Customer(++i, "name:" + i);
            ProducerRecord<String, Customer> record = new ProducerRecord<String, Customer>(topic, String.valueOf(customer.getCustomerID()), customer);
            producer.send(record);
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM