FLINK-序列化&反序列化数据-自定义KafkaDeserializationSchema(Java/Scala)


1.反序列化

kafka中的数据通常是键值对的,所以我们这里自定义反序列化类从kafka中消费键值对的消息。话不多说,直接上代码。

一、Scala代码

  1.自定义反序列化类:

import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]]{
  /*是否流结束,比如读到一个key为end的字符串结束,这里不再判断,直接返回false 不结束*/
  override def isEndOfStream(t: ConsumerRecord[String, String]): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = { new ConsumerRecord(
    record.topic(),
    record.partition(),
    record.offset(),
    record.timestamp(),
    record.timestampType(),
    record.checksum,
    record.serializedKeySize,
    record.serializedValueSize(),
    new String(record.key(),"UTF-8"),
    new String(record.value(),"UTF-8"))
 }
/*用于获取反序列化对象的类型*/ override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = { TypeInformation.of(new TypeHint[ConsumerRecord[String, String]] {}) } }

  2.主函数类

import java.util.Properties
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaDeserializerSchemaTest {
  def main(args: Array[String]): Unit = {
 
    /*环境初始化*/
    val senv:StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment()
    /*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/
    senv.enableCheckpointing(2000)
    /*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/
    val  myConsumer=new FlinkKafkaConsumer[ConsumerRecord[String, String]]("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig())
    /*指定消费位点*/
    val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
    /*这里从topic3 的0分区的第一条开始消费*/
    specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L)
    myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
    /*指定source数据源*/
    val source:DataStream[ConsumerRecord[String, String]]=senv.addSource(myConsumer)
 
    val keyValue=source.map(new MapFunction[ConsumerRecord[String, String],String] {
      override def map(message: ConsumerRecord[String, String]): String = {
        "key" + message.key + "  value:" + message.value
      }
    })
    /*打印接收的数据*/
    keyValue.print()
    /*启动执行*/
    senv.execute()
  }
 
  def getKafkaConfig():Properties={
    val props:Properties=new Properties()
    props.setProperty("bootstrap.servers","")
    props.setProperty("group.id","")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("auto.offset.reset","latest")
    props
  }
}

二、Java代码

  1.自定义反序列化类:

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
 
    private static  String encoding = "UTF8";
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
       /* System.out.println("Record--partition::"+record.partition());
        System.out.println("Record--offset::"+record.offset());
        System.out.println("Record--timestamp::"+record.timestamp());
        System.out.println("Record--timestampType::"+record.timestampType());
        System.out.println("Record--checksum::"+record.checksum());
        System.out.println("Record--key::"+record.key());
        System.out.println("Record--value::"+record.value());*/
        return new ConsumerRecord(record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*这里我没有进行空值判断,生产一定记得处理*/
                new  String(record.key(), encoding),
                new  String(record.value(), encoding));
    }
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

  2.主函数类

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaDeserializerSchemaTest {
    public static void main(String[] args) throws Exception {
        /*环境初始化*/
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        /*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/
        senv.enableCheckpointing(2000);
        /*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/
        FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer=new FlinkKafkaConsumer<ConsumerRecord<String, String>>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig());
 
        /*指定消费位点*/
        Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
        /*这里从topic3 的0分区的第一条开始消费*/
        specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L);
        myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
 
        DataStream<ConsumerRecord<String, String>> source = senv.addSource(myConsumer);
        DataStream<String> keyValue = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
            @Override
            public String map(ConsumerRecord<String, String> message) throws Exception {
                return "key"+message.key()+"  value:"+message.value();
            }
        });
        /*打印结果*/
        keyValue.print();
        /*启动执行*/
        senv.execute();
    }
    public static Properties getKafkaConfig(){
        Properties  props=new Properties();
        props.setProperty("bootstrap.servers","");
        props.setProperty("group.id","");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("auto.offset.reset","latest");
        return props;
    }
}

三、函数测试

  1.KafkaProducer发送测试数据类<key,value>

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.Properties;

public class KafkaPartitionProducer extends Thread{
  private static long count =10;
  private static String topic="test";
  private static String brokerList="";
  public static void main(String[] args) {
        KafkaPartitionProducer jproducer = new KafkaPartitionProducer();
        jproducer.start();
    }
    @Override
    public void run() {
        producer();
    }
    private void producer() {
        Properties props = config();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record=null;
        System.out.println("kafka生产数据条数:"+count);
        for (int i = 1; i <= count; i++) {
            String json = "{\"id\":" + i + ",\"ip\":\"192.168.0." + i + "\",\"date\":" + new Date().toString() + "}";
            String key ="key"+i;
            record = new ProducerRecord<String, String>(topic, key, json);
            producer.send(record, (metadata, e) -> {
                // 使用回调函数
                if (null != e) {
                    e.printStackTrace();
                }
                if (null != metadata) {
                    System.out.println(String.format("offset: %s, partition:%s, topic:%s  timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
                }
            });
        }
        producer.close();
    }
    private Properties config() {
        Properties props = new Properties();
        props.put("bootstrap.servers",brokerList);
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*自定义分区,两种形式*/
        /*props.put("partitioner.class", PartitionUtil.class.getName());*/
        return props;
    }
}

 2.序列化

一、Scala代码

  1.自定义序列化类:

import java.lang
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord

class KafkaStringSerializer(topic:String) extends KafkaSerializationSchema[String]{
  override def serialize(element: String, timestamp: lang.Long) =
    new ProducerRecord[Array[Byte],Array[Byte]](topic,element.getBytes())
}

 

参考原文链接:https://blog.csdn.net/lujisen/article/details/105756753


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM