一、KafkaSink
1、按流內容分發到對應topic,隔天自動切換
在flink自帶的kafka sink實現里,只支持寫到固定topic,而我們的kafka2kafka日志處理邏輯要求消息要按照ds字段值寫入到對應topic,topic名前綴相同,后面跟ds字段值,需要進行改造
具體實現思路如下:
(1)由如下源碼可知KeyedSerializationSchema對象才能賦值schema,從而可以通過schema.getTargetTopic獲得每條對應的topic,而常用的SimpleStringSchema並未繼承KeyedSerializationSchema,無法得到對應的topic,只能使用固定topic
private final KeyedSerializationSchema<IN> schema; @Override public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception { String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } ... }
(2)繼承合適的父類進行二次開發
實現了KeyedSerializationSchema的類有兩種:
- TypeInformationKeyValueSerializationSchema<K, V>,可序列化也可反序列化,即既可用於生產者又可用於消費者
- KeyedSerializationSchemaWrapper,只能序列化,只能用於生產者,傳入SerializationSchema對象后可以自動調用其序列化方法進行序列化
還可以自己實現KeyedSerializationSchema接口
為了最大限度降低代碼復雜度,提高執行效率,一定要選擇合適的父類進行二次開發。只有序列化邏輯需要修改,因此選擇實現KeyedSerializationSchema接口,重寫部分方法,實現按json流內容中的ds字段值與指定的前綴組合成每條kafka信息的目標topic
public class JsonKeyedSerializationSchemaWrapper implements KeyedSerializationSchema<JSONObject> { private String topicPrefix; public JsonKeyedSerializationSchemaWrapper(String topicPrefix) { this.topicPrefix = topicPrefix; } @Override public byte[] serializeValue(JSONObject element) { return element.toJSONString().getBytes(); } @Override public byte[] serializeKey(JSONObject element) { return null; } @Override public String getTargetTopic(JSONObject element) { return this.topicPrefix+"_"+element.getString("ds"); } }
(3)改造業務代碼
JsonKeyedSerializationSchemaWrapper serial = new JsonKeyedSerializationSchemaWrapper( String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game")) ); FlinkKafkaProducer011<JSONObject> kafkaResultProducer = new FlinkKafkaProducer011<>( parameterTool.get("kafka.sink.bootstrap.servers"), String.format(parameterTool.get("kafka.sink.topicPrefix"),parameterTool.get("game")) ,serial);
二、HdfsSink
1、按流內容分發到對應目錄
flink自帶的hdfs sink只支持寫入到固定目錄,而我們的kafka2hdfs處理邏輯要求消息要按照header_filepath字段值寫入到對應的目錄,類似如下形式:
/logs/ball/json/Chat/ds=2018-11-12/
/logs/ball/json/Chat/status-1/ds=2018-11-12/
/logs/ball/json/status-2-CheckWg/ds=2018-10-31/
具體實現思路如下:
(1)由源碼可知BucketingSink類的setBucketer(Bucketer<T> bucketer)方法確定要寫入的文件目錄
(2)繼承合適的父類進行二次開發
實現了Bucketer接口的類有以下兩種:
-
DateTimeBucketer 寫入到固定目錄的桶內,桶是按給定日期格式生成的
-
BasePathBucketer 寫入到固定目錄
我們所要寫入的目錄不需要分桶,因此繼承BasePathBucketer類,重寫部分方法
public class HdfsBasePathBucketer extends BasePathBucketer<JSONObject> { private static final long serialVersionUID = 1L; @Override public Path getBucketPath(Clock clock, Path basePath, JSONObject element) { String header_filepath = element.getString("header_filepath"); return super.getBucketPath(clock, new Path(basePath+"/"+header_filepath), element); } }
(3)改造業務代碼
Bucketer bucketer = new HdfsBasePathBucketer(); hdfsSink.setBucketer(bucketer);
2、定制文件內容
flink自帶的hdfs sink只支持將接收到的消息整體使用UTF-8格式寫入到文件,而我們的kafka2hdfs處理邏輯要求只寫body字段內容到文件
具體實現思路如下:
(1)由源碼可知BucketingSink類的setWriter(Writer<T> writer)方法確定要寫入的內容
(2)繼承合適的父類進行二次開發
-
AvroKeyValueSinkWriter 生成avro文件
- SequenceFileWriter 生成hadoop sequencefile文件,可指定壓縮級別
- StringWriter 默認生成UTF-8編碼的文本文件
繼承StringWriter,重寫部分方法
@Override public void write(JSONObject element) throws IOException { String body = element.getString("body"); try { cs = Charset.forName(csn); } catch (IllegalCharsetNameException e) { throw new IOException("The charset " + csn + " is not valid.", e); } catch (UnsupportedCharsetException e) { throw new IOException("The charset " + csn + " is not supported.", e); } FSDataOutputStream outputStream = getStream(); outputStream.write(body.getBytes(cs)); outputStream.write('\n'); } @Override public StringWriter<JSONObject> duplicate() { return new HdfsStringWriter(); }
(3)改造業務代碼
hdfsSink.setWriter(new HdfsStringWriter());