1. 卡夫卡序列化和反序列化
今天,在這篇Kafka SerDe文章中,我們將學習使用Kafka創建自定義序列化器和反序列化器的概念。此外,我們將了解序列化在Kafka中的工作原理以及為什么需要序列化。與此同時,我們將看到 Kafka序列化器示例和Kafka解串器示例。此外,這個Kafka序列化和反序列化教程為我們提供了 Kafka字符串序列化器和 Kafka 對象序列化器的知識。
基本上,Apache Kafka提供了我們可以輕松發布以及訂閱記錄流的功能。因此,我們可以靈活地創建自己的自定義序列化器以及解串器,這有助於使用它傳輸不同的數據類型。 那么,讓我們開始Kafka序列化和反序列化
2. Apache Kafka SerDe
但是,為了傳輸而將對象轉換為字節流的過程就是我們所說的序列化。雖然,Apache Kafka存儲以及在隊列中傳輸這些字節數組。
閱讀Apache Kafka用例| Kafka應用程序
然而,序列化的反面是反序列化。在這里,我們將數組的字節轉換為我們想要的數據類型。但是,請確保Kafka僅為少數數據類型提供序列化程序和反序列化程序,例如
- 串
- 長
- 雙
- 整數
- 字節
3.為什么在Kafka中使用Custom Serializer和Deserializer?
基本上,為了准備從生產者傳遞到代理的消息,我們使用序列化器。換句話說,在將整個消息傳輸到代理之前,讓生產者知道如何將消息轉換為字節數組,我們使用序列化器。類似地,要將字節數組轉換回對象,我們使用消費者的反序列化器。
4. Kafka SerDe的實施
實現org.apache.kafka.common.serialization.Serializer接口以創建序列化程序類非常重要。Ans,對於反序列化器類,重要的是實現org.apache.kafka.common.serialization.Deserializer接口。
讓我們來討論Apache Kafka架構及其基本概念
Kafka序列化和反序列化接口有3種方法:
一個。配置
在配置啟動時,我們調用Configure方法。
灣 序列化/反序列化
出於Kafka序列化和反序列化的目的,我們使用此方法。
C。關
在關閉Kafka會話時,我們使用Close方法。
閱讀如何創建Kafka客戶端
5.與Kafka的串行器接口
-
public interface Serializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); byte[] serialize(String var1, T var2); void close(); }
6.與Kafka的解串器接口
-
public interface Deserializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); T deserialize(String var1, byte[] var2); void close(); }
7. Serializer和Deserializer的示例
這里的依賴關系是:
讓我們來探討卡夫卡的優缺點
- 卡夫卡(0.10.1.1)。
- FasterXML Jackson(2.8.6)。
-
user.java: public class User { private String firstname; private int age; public User() { } public User(String firstname, int age) { this.firstname = firstname; this.age = age; } public String getfirstName() { return this.firstname; } public int getAge() { return this.age; } @Override public String toString() { return "User(" + firstname + ", " + age + ")"; } }
-
userserializer.java: public class UserSerializer implements Serializer { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { } }
-
Userdeserializer.java: public class UserDeserializer implements Deserializer { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; } }
此外,為了使用上面的序列化程序,我們必須使用此屬性進行注冊:
使用命令學習Apache Kafka Operations
-
props.put("value.serializer", "com.knoldus.serializers.UserSerializer");
那么,制作人將是:
-
try (Producer<String, User> producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord<String, User>("MyTopic", user)); System.out.println("Message " + user.toString() + " sent !!"); } catch (Exception e) { e.printStackTrace(); }
現在,我們再次需要為反序列化器注冊此屬性:
-
props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");
因此,消費者將:
-
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, User> messages = consumer.poll(100); for (ConsumerRecord<String, User> message : messages) { System.out.println("Message received " + message.value().toString()); } } } catch (Exception e) { e.printStackTrace(); }
所以,這就是Kafka序列化和反序列化。希望您喜歡並理解我們對Kafka的自定義序列化器和反序列化器的解釋。
讓我們修改Apache Kafka Workflow | Kafka Pub-Sub Messaging
8.結論
因此,在這個Kafka序列化和反序列化教程中,我們學會了創建一個自定義的Kafka SerDe示例。此外,我們看到了對Kafka的串行器和解串器的需求。與此同時,我們學習了Kafka序列化和反序列化的實現方法