Kafka 生產者 自定義序列化


Kafka在生產者中序列化為二進制對象推送給Broker,下面是一個自定義序列化的示例,序列化一個User對象;

首先,引入jackson-mapper-asl

<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.12</version>
</dependency>

然后定義需要被序列化的實體類:

package cn.org.fubin;

public class User {
    private String firstName;
    private String lastName;
    private int age;
    private String address;

    public User() {
    }

    public User(String firstName, String lastName, int age, String address) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
        this.address = address;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", age=" + age +
                ", address='" + address + '\'' +
                '}';
    }
}
View Code

接下來,創建序列化類,實現Kafka客戶端提供的Serializer接口:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;

public class UserSerializer implements Serializer {

    private ObjectMapper objectMapper;

    public void configure(Map configs, boolean isKey) {
        objectMapper = new ObjectMapper();
    }

    public byte[] serialize(String topic, Object data) {
        byte[] ret = null;
        try {
            ret = objectMapper.writeValueAsString(data).getBytes("utf-8");
        } catch (IOException e) {
            System.out.println("序列化失敗");
            e.printStackTrace();
        }
        return ret;
    }

    public void close() {

    }
}

Kafka默認提供如下實現:

 

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.RetriableException;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 *
 * 可重試異常
 * 1. 分區副本不可用
 * 2. Controller當前不可用
 * 3. 網絡瞬時故障
 *
 * 可自行恢復,超過重試次數也需要自行處理
 *
 *
 * 不可重試異常
 * 1. 發送消息尺寸過大
 * 2. 序列化失敗異常
 * 3. 其他類型異常
 *
 *
 */

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("value.serializer", "cn.org.fubin.UserSerializer");

        properties.put("acks", "-1");
        System.out.println(ProducerConfig.ACKS_CONFIG);
        properties.put("retries", "3");
        properties.put("batch.size", 1048576);
        properties.put("linger.ms", 10);
        properties.put("buffer.memory", "33554432");
        System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG);
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        properties.put("max.block.ms", "3000");

        String topic = "test-topic";
        Producer<String,User> producer = new KafkaProducer<String, User>(properties);

        User user = new User("a","b",23,"china");
        ProducerRecord<String ,User> record = new ProducerRecord<String, User>(topic,user);
        producer.send(record).get();
        producer.close();

    }

}

 

 然后在主類中指定聲明好的序列化類,並發送一個User實體:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM