Kafka 消息的序列化與反序列化(一)


在使用Kafka發送接收消息時,producer端需要序列化,consumer端需要反序列化,在大多數場景中,需要傳輸的是與業務規則相關的復雜類型,這就需要自定義數據結構。Avro是一種序列化框架,使用JSON來定義schema,shcema由原始類型(null,boolean,int,long,float,double,bytes,string)和復雜類型(record,enum,array,map,union,fixed)組成,schema文件以.avsc結尾,表示avro schema,有2種序列化方式

  • 二進制方式:也就是Specific方式,定義好schema asvc文件后,使用編譯器(avro-tools.jar)編譯生成相關語言(java)的業務類,類中會嵌入JSON schema
  • JSON方式:也就是Generic方式,在代碼中動態加載schema asvc文件,將FieldName - FieldValue,以Map<K,V>的方式存儲

 序列化后的數據,是schema和data同時存在的,如下圖

 

1:自定義序列化類:

先定義序列化類,使用Specific序列化方式,下面方法中使用了SpecificDatumWriter類

public class AvroWithSchemaSpecificSer<T, E> implements Serializer<T> {
   public byte[] serialize(String topic, T data) {
        SpecificData specificData = new SpecificData(); //用於日期和時間格式的轉換
        specificData.addLogicalTypeConversion(new DateConversion());
        specificData.addLogicalTypeConversion(new TimeConversion());
        specificData.addLogicalTypeConversion(new TimestampConversion());

        DatumWriter<T> datumWriter = new SpecificDatumWriter(this.schema, specificData);
        byte[] bytes = new byte[0];
        if (data != null) {

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
            dataFileWriter.setCodec(CodecFactory.fromString(this.codecName)).create(this.schema, baos);
            dataFileWriter.append(data);
            dataFileWriter.flush();
            baos.flush();
            bytes = baos.toByteArray();                        
            dataFileWriter.close();                    
            baos.close();
        }
        return bytes;
   }
}

 

2:創建producer對象:

KafkaProducer<K,V>是用於創建producer實例的線程安全的客戶端類,多個線程可以共用一個producer實例,而且通常情況下共用一個實例比每個線程創建一個producer實例性能要好。在創建produce實例時,Properities必須配置的3個參數為:bootstrap.servers,key.serializer,value.serializer,關於KafkaProducer類,可參看官方文檔KafkaProducer

創建producer對象,並加入到虛擬機關閉鈎子中,用於在虛擬機關閉是清理producer

Serializer<K> keyDeserClass = (Serializer) Class.forName(props.getProperty("key.serializer")).newInstance();

Class<?> cl = Class.forName(props.getProperty("value.serializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Serializer<V> valueSerClass = (Serializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);

Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);
Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        producer.close();
    }
});

 

函數原型如下:

public class KafkaProducer<K, V> implements Producer<K, V> {
    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
}

第一個參數是一個Properties類型,配置如下

{
	security.protocol=SASL_PLAINTEXT,
	schema.registry.url=http://yourregistryurl.youcompany.com:8080,
	bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
	key.serializer=org.apache.kafka.common.serialization.LongSerializer,	
	value.serializer=com.youcompany.serialization.AvroSchemaSpecificSer,
pojo_class_name=UserSecurity client.id=18324@xxx, acks=all }

第二個參數是key的序列化實例,從第一個參數的key.serializer獲取類名字 "org.apache.kafka.common.serialization.LongSerializer",並反射創建類實例

第三個參數是value的序列化實例,從第一個參數的value.serializer獲取類名字 "com.youcompany.serialization.AvroSchemaSpecificSer",應用反射創建類實例,這是一個自定義的序列化類,用於序列化pojo_class_name所指向的avro類(由avro schema定義並經過編譯后的業務類)UserSecurityRequest

Class<?> cl = Class.forName(props.getProperty("value.serializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Serializer<V> valueSerClass = (Serializer)cons.newInstance(produceConfig.get("pojo_class_name"), null);

 

注:
在生成producer實例過程中,將調用方法:ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 
因為上面第一個參數里配置的authentication為SASL_PLAINTEXT,所以此方法將創建一個SaslChannelBuilder 通道構造器,

這個過程將使用kerberos方式登陸認證,可參見另外一篇博客

 

3:producer發送消息:

ProducerRecord<K, V> producerRecord = new ProducerRecord<>("mytopic",data);
producer.send(producerRecord, new ProducerCallBack(requestId));

先使用avro業務對象(UserSecurityRequest)data創建producerRecord,然后調用producer發送消息,有同步和異步2種發送方式,此處使用的是異步方式,消息將被存儲在待發送的IO緩存后即刻返回,這樣可以並行無阻塞的發送更多消息,提高producer性能,並在回調函數中獲取消息的offset。在send的過程中,如果有攔截器,則先調用攔截器,再繼續發送消息,send的源代碼如下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
}

在do.Send函數中,可以看到調用了value的序列化

byte[] serializedValue;
try {
      serializedValue = this.valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException var16) {
      throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer");
}

而對於value的序列化,正是使用了自定義的對avro schema序列化的類

 

 4:架構圖:

 

 

 參考:

Apache Avro™ 1.8.1 Specification


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM