在使用Kafka發送接收消息時,producer端需要序列化,consumer端需要反序列化,在大多數場景中,需要傳輸的是與業務規則相關的復雜類型,這就需要自定義數據結構。Avro是一種序列化框架,使用JSON來定義schema,shcema由原始類型(null,boolean,int,long,float,double,bytes,string)和復雜類型(record,enum,array,map,union,fixed)組成,schema文件以.avsc結尾,表示avro schema,有2種序列化方式
- 二進制方式:也就是Specific方式,定義好schema asvc文件后,使用編譯器(avro-tools.jar)編譯生成相關語言(java)的業務類,類中會嵌入JSON schema
- JSON方式:也就是Generic方式,在代碼中動態加載schema asvc文件,將FieldName - FieldValue,以Map<K,V>的方式存儲
序列化后的數據,是schema和data同時存在的,如下圖

1:自定義序列化類:
先定義序列化類,使用Specific序列化方式,下面方法中使用了SpecificDatumWriter類
public class AvroWithSchemaSpecificSer<T, E> implements Serializer<T> { public byte[] serialize(String topic, T data) { SpecificData specificData = new SpecificData(); //用於日期和時間格式的轉換 specificData.addLogicalTypeConversion(new DateConversion()); specificData.addLogicalTypeConversion(new TimeConversion()); specificData.addLogicalTypeConversion(new TimestampConversion()); DatumWriter<T> datumWriter = new SpecificDatumWriter(this.schema, specificData); byte[] bytes = new byte[0]; if (data != null) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.setCodec(CodecFactory.fromString(this.codecName)).create(this.schema, baos); dataFileWriter.append(data); dataFileWriter.flush(); baos.flush(); bytes = baos.toByteArray(); dataFileWriter.close(); baos.close(); } return bytes; } }
2:創建producer對象:
KafkaProducer<K,V>是用於創建producer實例的線程安全的客戶端類,多個線程可以共用一個producer實例,而且通常情況下共用一個實例比每個線程創建一個producer實例性能要好。在創建produce實例時,Properities必須配置的3個參數為:bootstrap.servers,key.serializer,value.serializer,關於KafkaProducer類,可參看官方文檔KafkaProducer
創建producer對象,並加入到虛擬機關閉鈎子中,用於在虛擬機關閉是清理producer
Serializer<K> keyDeserClass = (Serializer) Class.forName(props.getProperty("key.serializer")).newInstance();
Class<?> cl = Class.forName(props.getProperty("value.serializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Serializer<V> valueSerClass = (Serializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);
Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
producer.close();
}
});
函數原型如下:
public class KafkaProducer<K, V> implements Producer<K, V> { public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) }
第一個參數是一個Properties類型,配置如下
{
security.protocol=SASL_PLAINTEXT,
schema.registry.url=http://yourregistryurl.youcompany.com:8080,
bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
key.serializer=org.apache.kafka.common.serialization.LongSerializer,
value.serializer=com.youcompany.serialization.AvroSchemaSpecificSer,
pojo_class_name=UserSecurity
client.id=18324@xxx,
acks=all
}
第二個參數是key的序列化實例,從第一個參數的key.serializer獲取類名字 "org.apache.kafka.common.serialization.LongSerializer",並反射創建類實例
第三個參數是value的序列化實例,從第一個參數的value.serializer獲取類名字 "com.youcompany.serialization.AvroSchemaSpecificSer",應用反射創建類實例,這是一個自定義的序列化類,用於序列化pojo_class_name所指向的avro類(由avro schema定義並經過編譯后的業務類)UserSecurityRequest
Class<?> cl = Class.forName(props.getProperty("value.serializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Serializer<V> valueSerClass = (Serializer)cons.newInstance(produceConfig.get("pojo_class_name"), null);
注: 在生成producer實例過程中,將調用方法:ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 因為上面第一個參數里配置的authentication為SASL_PLAINTEXT,所以此方法將創建一個SaslChannelBuilder 通道構造器,
這個過程將使用kerberos方式登陸認證,可參見另外一篇博客
3:producer發送消息:
ProducerRecord<K, V> producerRecord = new ProducerRecord<>("mytopic",data); producer.send(producerRecord, new ProducerCallBack(requestId));
先使用avro業務對象(UserSecurityRequest)data創建producerRecord,然后調用producer發送消息,有同步和異步2種發送方式,此處使用的是異步方式,消息將被存儲在待發送的IO緩存后即刻返回,這樣可以並行無阻塞的發送更多消息,提高producer性能,並在回調函數中獲取消息的offset。在send的過程中,如果有攔截器,則先調用攔截器,再繼續發送消息,send的源代碼如下:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return this.doSend(interceptedRecord, callback); }
在do.Send函數中,可以看到調用了value的序列化
byte[] serializedValue; try { serializedValue = this.valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException var16) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer"); }
而對於value的序列化,正是使用了自定義的對avro schema序列化的類
4:架構圖:

參考:
