Kafka 消息的序列化與反序列化(二)


自定義反序列化類:

對於自定義的avro schema結構,需要有自定義的類在consumer時反序列化,反序列化類實例在consumer構造的時候通過參數傳入

public class AvroWithSchemaSpecificDeser<T,E> implements Deserializer<T> {
    private Class<T> typeClass;
    private transient Schema schema;
    private String codecName;
    
    /**
     * Simple constructor 
     * 
     * @param pojoClassName The pojo class name to be deserialized
     * @param codecName The codec used for compression, if null, no compression is applied
     */
    public AvroWithSchemaSpecificDeser(final String pojoClassName, final String codecName) {
        try {
            Class<T> payloadClassType = (Class<T>) Class.forName(pojoClassName);

            typeClass = payloadClassType;
            schema = (Schema) payloadClassType.getField("SCHEMA$").get(null);
this.codecName = codecName != null ? codecName : "null";
           
        } catch (AvroRuntimeException ex) {
            throw new IllegalStateException(String.format("Not able to initialize avro object. Details: %s", ex.getMessage()), ex);
        }         
    }
       
    @Override
    public T deserialize(String topic, byte[] data) {
        T pojoObject= null;
        if(data != null && data.length > 0) {
            DatumReader<T> datumReader = null;
            DataFileReader<T> dataFileReader = null;
            try {
                SpecificData specificData = new SpecificData(); //用於日期和時間格式的轉換
                specificData.addLogicalTypeConversion(new DateConversion());
                specificData.addLogicalTypeConversion(new TimeConversion());
                specificData.addLogicalTypeConversion(new TimestampConversion());
        
                pojoObject = typeClass.newInstance();
                datumReader = new SpecificDatumReader<>(null, schema, specificData);
                dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data), datumReader);
                while (dataFileReader.hasNext()) {
                    pojoObject = dataFileReader.next(pojoObject);
                }

            } catch(Exception ex) {
                SerializationException serex = new SerializationException(String.format("Error when deserializing byte[] to this class (%s)  from this topic (%s)",typeClass.toString(), topic), ex);
            } finally {
                if(dataFileReader != null) {
                    dataFileReader.close();
                }
            }
        }
        return pojoObject;
    }
}

 

 
創建consumer對象:

首先在RunnableConsumer中需要創建kafka consumer實例,需要傳入consumer的屬性列表及反序列化對象,在下面創建反序列化實例時,只傳入了pojo_class_name,codec使用了null,也就是沒有使用任何壓縮編碼

Deserializer<K> keyDeserClass = (Deserializer) Class.forName(props.getProperty("key.deserializer")).newInstance();

Class<?> cl = Class.forName(props.getProperty("value.deserializer"));
Constructor<?> cons = cl.getConstructor(Map.class);
Deserializer<V> valueSerClass = (Deserializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);

consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);

 

consumer的props屬性從配置服務器中讀取,其值為類似以下的k-v,其中關鍵的字段為bootstrap.servers,key.deserializer,value.deserializer,group.id和需要反序列化的pojo_class_name

{
	security.protocol=SASL_PLAINTEXT,
	schema.registry.url=http://yourregistryurl.youcompany.com:8080,
	bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
	key.deserializer=org.apache.kafka.common.serialization.LongDeserializer,	
	value.deserializer=com.youcompany.serialization.AvroSchemaSpecificDeser,
	client.id=20353@xxx,
	group.id=yourgroupid,
pojo_class_name=UserSecurity }

第二個參數是key的反序列化對象,這是一個kafka的標准的反序列化類 LongDeserializer

第三個參數是value的反序列化對象,反射創建時,需要讀取pojo_class_name參數

 

 

訂閱和消費消息:

在consumer對象創建好后,就可以從線程池中啟動consumer了,訂閱指定的topic,並poll消息,如果有拉取到消息,這將消息notify給監聽者

 consumer.subscribe(topics);
            ConsumerGroup.this.isRunning = true;

            while (true) {
                ConsumerRecords<K,V> records = null;
                try {

                    processCommit(SyncMode.ASYNC);

                    records = consumer.poll(isPolling ? Long.MAX_VALUE : 0);
                    if(records != null && records.count() > 0) {
                        listener.notify(records);
                    }
                } catch(WakeupException wex) {
                    LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex);
                } 
                isPolling = true;
            }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM