Flink Sink定制開發


一、KafkaSink

1、按流內容分發到對應topic,隔天自動切換

在flink自帶的kafka sink實現里,只支持寫到固定topic,而我們的kafka2kafka日志處理邏輯要求消息要按照ds字段值寫入到對應topic,topic名前綴相同,后面跟ds字段值,需要進行改造

具體實現思路如下:

(1)由如下源碼可知KeyedSerializationSchema對象才能賦值schema,從而可以通過schema.getTargetTopic獲得每條對應的topic,而常用的SimpleStringSchema並未繼承KeyedSerializationSchema,無法得到對應的topic,只能使用固定topic

FlinkKafkaProducer011
private final KeyedSerializationSchema<IN> schema;
@Override
public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
    String targetTopic = schema.getTargetTopic(next);
    if (targetTopic == null) {
        targetTopic = defaultTopicId;
    }
    ...
}

(2)繼承合適的父類進行二次開發

實現了KeyedSerializationSchema的類有兩種:

  • TypeInformationKeyValueSerializationSchema<K, V>,可序列化也可反序列化,即既可用於生產者又可用於消費者
  • KeyedSerializationSchemaWrapper,只能序列化,只能用於生產者,傳入SerializationSchema對象后可以自動調用其序列化方法進行序列化

還可以自己實現KeyedSerializationSchema接口

為了最大限度降低代碼復雜度,提高執行效率,一定要選擇合適的父類進行二次開發。只有序列化邏輯需要修改,因此選擇實現KeyedSerializationSchema接口,重寫部分方法,實現按json流內容中的ds字段值與指定的前綴組合成每條kafka信息的目標topic

JsonKeyedSerializationSchemaWrapper
public class JsonKeyedSerializationSchemaWrapper implements KeyedSerializationSchema<JSONObject> {
    private String topicPrefix;
 
    public JsonKeyedSerializationSchemaWrapper(String topicPrefix) {
        this.topicPrefix = topicPrefix;
    }
 
    @Override
    public byte[] serializeValue(JSONObject element) {
        return element.toJSONString().getBytes();
    }
 
    @Override
    public byte[] serializeKey(JSONObject element) {
        return null;
    }
 
    @Override
    public String getTargetTopic(JSONObject element) {
        return this.topicPrefix+"_"+element.getString("ds");
    }
}

(3)改造業務代碼

LogParserBallEnconfEntopic
JsonKeyedSerializationSchemaWrapper serial = new JsonKeyedSerializationSchemaWrapper(
    String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game"))
);
 
FlinkKafkaProducer011<JSONObject> kafkaResultProducer = new FlinkKafkaProducer011<>(
        parameterTool.get("kafka.sink.bootstrap.servers"),
        String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game"))
        ,serial);

二、HdfsSink

1、按流內容分發到對應目錄

flink自帶的hdfs sink只支持寫入到固定目錄,而我們的kafka2hdfs處理邏輯要求消息要按照header_filepath字段值寫入到對應的目錄,類似如下形式:

/logs/ball/json/Chat/ds=2018-11-12/
/logs/ball/json/Chat/status-1/ds=2018-11-12/
/logs/ball/json/status-2-CheckWg/ds=2018-10-31/

具體實現思路如下:

(1)由源碼可知BucketingSink類的setBucketer(Bucketer<T> bucketer)方法確定要寫入的文件目錄

(2)繼承合適的父類進行二次開發

實現了Bucketer接口的類有以下兩種:

  • DateTimeBucketer 寫入到固定目錄的桶內,桶是按給定日期格式生成的

  • BasePathBucketer 寫入到固定目錄

我們所要寫入的目錄不需要分桶,因此繼承BasePathBucketer類,重寫部分方法

HdfsBasePathBucketer
public class HdfsBasePathBucketer extends BasePathBucketer<JSONObject> {
    private static final long serialVersionUID = 1L;
 
    @Override
    public Path getBucketPath(Clock clock, Path basePath, JSONObject element) {
        String header_filepath = element.getString("header_filepath");
        return super.getBucketPath(clock, new Path(basePath+"/"+header_filepath), element);
    }
}

 (3)改造業務代碼

Bucketer bucketer = new HdfsBasePathBucketer();
hdfsSink.setBucketer(bucketer);

  

2、定制文件內容

flink自帶的hdfs sink只支持將接收到的消息整體使用UTF-8格式寫入到文件,而我們的kafka2hdfs處理邏輯要求只寫body字段內容到文件

具體實現思路如下:

(1)由源碼可知BucketingSink類的setWriter(Writer<T> writer)方法確定要寫入的內容

(2)繼承合適的父類進行二次開發

  • AvroKeyValueSinkWriter 生成avro文件

  • SequenceFileWriter 生成hadoop sequencefile文件,可指定壓縮級別
  • StringWriter 默認生成UTF-8編碼的文本文件

繼承StringWriter,重寫部分方法

HdfsStringWriter
@Override
public void write(JSONObject element) throws IOException {
    String body = element.getString("body");
    try {
        cs = Charset.forName(csn);
    }
    catch (IllegalCharsetNameException e) {
        throw new IOException("The charset " + csn + " is not valid.", e);
    }
    catch (UnsupportedCharsetException e) {
        throw new IOException("The charset " + csn + " is not supported.", e);
    }
    FSDataOutputStream outputStream = getStream();
    outputStream.write(body.getBytes(cs));
    outputStream.write('\n');
}
 
@Override
public StringWriter<JSONObject> duplicate() {
    return new HdfsStringWriter();
}

(3)改造業務代碼

hdfsSink.setWriter(new HdfsStringWriter());

 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM