生產者需要用序列化器(Serializer)把對象轉換成字節數組才能通過網絡發送給Kafka。而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的對象。
先參考下面代碼實現一個簡單的客戶端。
為了方便,消息的 key 和 value 都使用了字符串,對應程序中的序列化器也使用了客戶端自帶的 org.apache.kafka.common.serialization.StringSerializer,除了用於 String 類型的序列化器,還有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 這幾種類型,它們都實現了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3個方法:
configure() 方法用來配置當前類,serialize() 方法用來執行序列化操作。而 close() 方法用來關閉當前的序列化器,一般情況下 close() 是一個空方法,如果實現了此方法,則必須確保此方法的冪等性,因為這個方法很可能會被 KafkaProducer 調用多次。
生產者使用的序列化器和消費者使用的反序列化器是需要一一對應的,如果生產者使用了某種序列化器,比如 StringSerializer,而消費者使用了另一種序列化器,比如 IntegerSerializer,那么是無法解析出想要的數據的
下面就以 StringSerializer 為例來看看 Serializer 接口中的3個方法的使用方法,StringSerializer 類的具體實現如代碼
首先是 configure() 方法,這個方法是在創建 KafkaProducer 實例的時候調用的,主要用來確定編碼類型,不過一般客戶端對於 key.serializer.encoding、value.serializer. encoding 和 serializer.encoding 這幾個參數都不會配置,在 KafkaProducer 的參數集合(ProducerConfig)里也沒有這幾個參數(它們可以看作用戶自定義的參數),所以一般情況下 encoding 的值就為默認的“UTF-8”。serialize() 方法非常直觀,就是將 String 類型轉為 byte[] 類型。
如果 Kafka 客戶端提供的幾種序列化器都無法滿足應用需求,則可以選擇使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具來實現,或者使用自定義類型的序列化器來實現。下面就以一個簡單的例子來介紹自定義類型的使用方法
假設我們要發送的消息都是 Company 對象,這個 Company 的定義很簡單,只有名稱 name 和地址 address,示例代碼參考如下
下面我們再來看一下 Company 對應的序列化器 CompanySerializer,示例代碼如代碼
如何使用自定義的序列化器 CompanySerializer 呢?只需將 KafkaProducer 的 value.serializer 參數設置為 CompanySerializer 類的全限定名即可。假如我們要發送一個 Company 對象到 Kafka,關鍵代碼如代碼
注意,示例中消息的 key 對應的序列化器還是 StringSerializer,這個並沒有改動。其實 key.serializer 和 value.serializer 並沒有太大的區別
