順序保證
Kafka可以保證同一個分區里的消息是有序的。即如果生產者按照一定的順序發送消息,那么Broker就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序去讀取它們。
但是在Kafka基本概念及常用場景中,我們介紹了一個主題是可以被分為若干個分區的。
那么我們將一個主題設置為只有一個分區的情況下,那么就可以保證其消息的順序么?肯定也是不可以的,因為在Kafka的生產者中,我們介紹了Kafka生產者的一些配置,其中會有兩個參數配置也會影響消息的順序,如下:
如果我們將上述參數retries設置為大於0的數,即允許消息失敗進行重發,那么如果Kafka生產者在安裝順序發送了A、B兩條消息,因為max.in.flight.requests.per.connection參數默認為5,所以生產者是允許在消息A沒有收到響應的情況下,直接發送消息B的,然后假設消息A發送失敗、消息B發送成功,然后消息A會發送重試,重發消息后成功,那么分區中的消息順序就為消息B、A了,和發送時的順序不一致。
所以如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關鍵的,所以不建議把retires設為0 。可以把max.in.flight.request.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給Broker 。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才會這么做。
序列化
創建生產者對象必須指定序列化器,如果默認的序列化器並不能滿足我們所有的場景。那么我們完全可以自定義序列化器。只要實現org.apache.kafka.common.serialization.Serializer接口,以及org.apache.kafka.common.serialization.Deserializer即可。
public class MySerializer implements Serializer<Object> { @Override public void configure(Map<String, ?> configs, boolean isKey) { //do nothing } @Override public byte[] serialize(String topic, Object data) { byte[] bytes = null; try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos)) { oos.writeObject(data); oos.flush(); bytes = bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return bytes; } @Override public void close() { //do nothing } }
public class MyDeserializer implements Deserializer<Object> { @Override public void configure(Map<String, ?> configs, boolean isKey) { //do nothing } @Override public Object deserialize(String topic, byte[] data) { Object object = null; try (ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bis)) { object = ois.readObject(); } catch (Exception e) { e.printStackTrace(); } return object; } @Override public void close() { //do nothing } }
上述是我們使用了Jdk原生的序列化機制完成的,當然我們也可以使用一些優秀的第三方序列化框架,如我們在
Netty中介紹的ProtoBuf、MessagePack、Kryo,以及Kafka官方推薦的Apache Avro。
然后我們就Kafka的生產者和消費者中,使用上述我們自定義的序列化類和反序列化類即可,如下:
這里我們就來簡單了解一下Kafka官方推薦的Avro,其實Avro和ProtoBuf有一點點類似,都會生成了額外的文件,Avro會使用一個JSON文件作為schema來描述數據,Avro在讀寫時會用到這個schema,可以把這個schema內嵌在數據文件中。這樣,不管數據格式如何變動,消費者都知道如何處理數據。
但是內嵌的消息,自帶格式,會導致消息的大小不必要的增大,消耗了資源。我們可以使用schema注冊表機制,將所有寫入的數據用到的schema保存在注冊表中,然后在消息中引用schema的標識符,而讀取的數據的消費者程序使用這個標識符從注冊表中拉取schema來反序列化記錄。
注意:Kafka本身並不提供schema注冊表,需要借助第三方,現在已經有很多的開源實現,比如Confluent Schema Registry,可以從GitHub上獲取。
至於如果使用其Avro,可以參考Kafka中使用Avro序列化
分區
我們在新增ProducerRecord對象中可以看到,ProducerRecord包含了目標主題,鍵和值,Kafka的消息都是一個個的鍵值對。(當然我們在ProducerRecord對象中也可以直接指定所要發送的分區)
鍵的主要用途有兩個:
用來決定消息被寫往主題的哪個分區,擁有相同鍵的消息將被寫往同一個分區
還可以作為消息的附加消息
另外鍵可以設置為默認的null。如果鍵值為null,並且使用默認的分區器,分區器使用輪詢算法將消息均衡地分布到各個分區上。
如果鍵不為空,並且使用默認的分區器,Kafka對鍵進行散列(Kafka內部實現自定義的散列算法),然后根據散列值把消息映射到特定的分區上。很明顯,同一個鍵總是被映射到同一個分區。但是只有不改變主題分區數量的情況下,鍵和分區之間的映射才能保持不變,一旦增加了新的分區,就無法保證了,所以如果要使用鍵來映射分區,那就要在創建主題的時候把分區規划好,而且永遠不要增加新分區。
但是某些情況下,數據特性決定了需要進行特殊分區,比如公司業務的特殊性,上海的業務量明顯占主要部分,比如占據了總業務量的20%,我們需要對上海的訂單進行單獨分區處理,默認的散列分區算法不合適了, 我們就可以自定義分區算法,對上海的訂單進行單獨處理,其他地區沿用散列分區算法。或者某些情況下,我們用value來進行分區。
如果需要滿足上述的要求,那么我們就需要使用到自定義分區器,如下我們就自定義一個簡單的一value值的hashCode來進行分區,主要就是實現org.apache.kafka.clients.producer.Partitioner接口即可,如下:
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int num = partitionInfos.size(); //Integer num = cluster.partitionCountForTopic(topic); return value.hashCode() % num; } @Override public void close() { //do nothing } @Override public void configure(Map<String, ?> configs) { //do nothing } }
然后在其Kafka的生產者和消費者中,添加相應的配置即可,和自定義序列化類類似
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);