kafka自定義消息序列化和反序列化方式


轉自 https://blog.csdn.net/shirukai/article/details/82152172

kafka自定義消息序列化和反序列化方式
版本說明:

kafka版本:kafka_2.12-2.0.0.tgz

pom依賴:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
1
2
3
4
5
1 關於kafka的序列化和反序列化
kafka在發送和接受消息的時候,都是以byte[]字節型數組發送或者接受的。但是我們平常使用的時候,不但可以使用byte[],還可以使用int、short、long、float、double、String等數據類型,這是因為在我們使用這些數據類型的時候,kafka根據我們指定的序列化和反序列化方式轉成byte[]類型之后再進行發送或者接受的。

通常我們在使用kakfa發送或者接受消息的時候都需要指定消息的key和value序列化方式,如設置value.serializer為org.apache.kafka.common.serialization.StringSerializer,設置value的序列化方式為字符串,即我們可以發送string類型的消息。目前kafka原生支持的序列化和反序列化方式如下兩表所示:

1.1kafka序列化方式表

序列化方式 對應java數據類型 說明
org.apache.kafka.common.serialization.ByteArraySerializer byte[] 原生類型
org.apache.kafka.common.serialization.ByteBufferSerializer ByteBuffer 關於ByteBuffer
org.apache.kafka.common.serialization.IntegerSerializer Interger
org.apache.kafka.common.serialization.ShortSerializer Short
org.apache.kafka.common.serialization.LongSerializer Long
org.apache.kafka.common.serialization.DoubleSerializer Double
org.apache.kafka.common.serialization.StringSerializer String
1.2kafka反序列化方式表

序列化方式 對應java數據類型 說明
org.apache.kafka.common.serialization.ByteArrayDeserializer byte[] 原生類型
org.apache.kafka.common.serialization.ByteBufferDeserializer ByteBuffer 關於ByteBuffer
org.apache.kafka.common.serialization.IntegerDeserializer Interger
org.apache.kafka.common.serialization.ShortDeserializer Short
org.apache.kafka.common.serialization.LongDeserializer Long
org.apache.kafka.common.serialization.DoubleDeserializer Double
org.apache.kafka.common.serialization.StringDeserializer String
2 kafka原生序列化和反序列化方式的實現
上面我們了解一些關於kafka原生的一些序列化和反序列化方式。它們究竟是如何實現的呢?以string類型為例子,我們看一下,kafka如何實現序列化/反序列化的。

kafka序列化/反序列化方式的實現代碼在org.apache.kafka.common.serialization包下。

2.1 String 序列化
我們查看org.apache.kafka.common.serialization.StringSerializer這個類。

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}

@Override
public void close() {
// nothing to do
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
由上面的代碼我們可以看出,String的序列化類是繼承了Serializer接口,指定<String>泛型,然后實現的Serializer接口的configure()、serialize()、close()方法。代碼重點的實現是在serialize(),可以看出這個方法將我們傳入的String類型的數據,簡單的通過data.getBytes()方法進行了序列化。

2.1 String 反序列化
我們查看org.apache.kafka.common.serialization.StringDeserializer這個類。

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}

@Override
public void close() {
// nothing to do
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
同樣,由上面的代碼我們可以看出,String的反序列化類是繼承了Deserializer接口,指定<String>泛型,然后實現的Deserializer接口的configure()、deserialize()、close()方法。代碼重點的實現是在deserialize(),可以看出這個方法將我們傳入的byte[]類型的數據,簡單的通過return new String(data, encoding)方法進行了反序列化得到了String類型的數據。

3 kafka自定義序列化/反序列化方式
通過上面,我們對kafka原生序列化/反序列化方式的了解,我們可以看出,kafka實現序列化/反序列化可以簡單的總結為兩步,第一步繼承序列化Serializer或者反序列化Deserializer接口。第二步實現接口方法,將指定類型序列化成byte[]或者將byte[]反序列化成指定數據類型。所以接下來,我們來實現自己的序列化/反序列化方式。

這里我們介紹兩種序列化方式一種是fastjson另一種是protostuff

3.1 基於fastjson的序列化/反序列化的實現
例如我們有一個Person實體類,我們需要將User利用fastjson進行序列化/反序列化操作之后,在kafka上發送接收消息。

3.1.1 Person.java
package com.springboot.demo.kafka.entity;

/**
* Created by shirukai on 2018/8/25
*/
public class Person {
private int id;
private String name;
private int age;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
3.1.2 序列化實現PersonJsonSerializer.java
package com.springboot.demo.kafka.serialization;

import com.alibaba.fastjson.JSON;
import com.springboot.demo.kafka.entity.Person;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class PersonJsonSerializer implements Serializer<Person> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public byte[] serialize(String topic, Person data) {
return JSON.toJSONBytes(data);
}

@Override
public void close() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.1.3 反列化實現PersonJsonDeserializer.java
package com.springboot.demo.kafka.serialization;

import com.alibaba.fastjson.JSON;
import com.springboot.demo.kafka.entity.Person;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class PersonJsonDeserializer implements Deserializer<Person> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public Person deserialize(String topic, byte[] data) {
return JSON.parseObject(data, Person.class);
}

@Override
public void close() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.1.4 kafka測試
3.1.4.1 發送消息
@Test
public void producerJson() {
Properties props = new Properties();
//kakfa 服務
props.put("bootstrap.servers", "localhost:9092");
//leader 需要等待所有備份都成功寫入日志
props.put("acks", "all");
//重試次數
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列化方式
props.put("value.serializer", "com.springboot.demo.kafka.serialization.PersonJsonSerializer");
Producer<String, Person> producer = new KafkaProducer<>(props);
Person person = new Person();
for (int i = 0; i < 100; i++) {
System.out.println(i);
person.setId(i);
person.setName("personJsonSerialization_" + i);
person.setAge(18);
producer.send(new ProducerRecord<>("personJsonSerialization", "client" + i, person));
}
producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.1.4.2 接收消息
@Test
public void consumerJson() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//五位數
props.put("group.id", "123456");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.springboot.demo.kafka.serialization.PersonJsonDeserializer");
KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("personJsonSerialization"));

ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(1000));
System.out.println(records.count());
for (ConsumerRecord<String, Person> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
3.2 基於protostuff的序列化/反序列化的實現
3.2.1 ProtostuffUtil.java 工具類
package com.springboot.demo.utils;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by shirukai on 2018/8/14
* protostuff 序列化/反序列化工具類
*/
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

/**
* 序列化
*
* @param message 序列化數據
* @param tClass .class
* @param <T> 類型
* @return byte[]
*/
public static <T> byte[] serializer(T message, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
return ProtostuffIOUtil.toByteArray(message, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
}

/**
* 反序列化
*
* @param bytes bytes
* @param tClass .class
* @param <T> 類型
* @return T
*/
public static <T> T deserializer(byte[] bytes, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
T message = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, message, schema);
return message;
}

private static <T> Schema<T> getSchema(Class<T> tClass) {
Schema<T> schema = (Schema<T>) cachedSchema.get(tClass);
if (schema == null) {
schema = RuntimeSchema.createFrom(tClass);
cachedSchema.put(tClass, schema);
}
return schema;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
3.2.2 序列化實現PersonProtostuffSerializer.java
package com.springboot.demo.kafka.serialization;

import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class PersonProtostuffSerializer implements Serializer<Person> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public byte[] serialize(String topic, Person data) {
return ProtostuffUtil.serializer(data, Person.class);
}

@Override
public void close() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.2.3 反序列化實現PersonProtostuffDeserializer.java
package com.springboot.demo.kafka.serialization;

import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class PersonProtostuffDeserializer implements Deserializer<Person> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public Person deserialize(String topic, byte[] data) {
return ProtostuffUtil.deserializer(data, Person.class);
}

@Override
public void close() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.2.4 kafka測試
3.2.4.1 發送消息
@Test
public void producerProtostuff(){
Properties props = new Properties();
//kakfa 服務
props.put("bootstrap.servers", "localhost:9092");
//leader 需要等待所有備份都成功寫入日志
props.put("acks", "all");
//重試次數
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列化方式
props.put("value.serializer", "com.springboot.demo.kafka.serialization.PersonProtostuffSerializer");
Producer<String, Person> producer = new KafkaProducer<>(props);
Person person = new Person();
for (int i = 0; i < 100; i++) {
System.out.println(i);
person.setId(i);
person.setName("personJsonSerialization_" + i);
person.setAge(18);
producer.send(new ProducerRecord<>("personProtostuffSerialization", "client" + i, person));
}
producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
3.2.4.2 接收消息
@Test
public void consumerProtostuff(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//五位數
props.put("group.id", "123456");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.springboot.demo.kafka.serialization.PersonProtostuffDeserializer");
KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("personProtostuffSerialization"));

ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(1000));
System.out.println(records.count());
for (ConsumerRecord<String, Person> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
4 總結
序列化方式還有好多種,關於采用什么樣的方式去序列化數據還需要根據業務場景自己去定義。這里記錄一下關於使用Protostuff的時候遇到的問題,當序列化Object data這樣的數據的時候,protostuff在創建RuntimeSchema的時候會報空指針異常。查看源碼是因為Object.class.getSuperclass()為null,這時候會拋出空指針異常
————————————————
版權聲明:本文為CSDN博主「shirukai」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/shirukai/article/details/82152172


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM