FlinkCDC采集数据格式


delete

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset={ts_sec=1634898017, file=master.000007, pos=982176634, row=1, server_id=1, event=2}
} 
ConnectRecord
{
    topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, 
    keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, 
    value=Struct{before=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0},
    source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898017000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982176791,row=0,thread=1014574},
    op=d,
    ts_ms=1634898015827}, 
    valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, 
    timestamp=null, 
    headers=ConnectHeaders(headers=)
}

insert

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset={ts_sec=1634898005, file=master.000007, pos=982165605, row=1, server_id=1, event=2}
} 
ConnectRecord
{
    topic='mysql_binlog_source.test.tbl_rule_config', 
    kafkaPartition=null, 
    key=Struct{id=5}, 
    keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, 
    value=Struct{after=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0},
    source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898005000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982165762,row=0,thread=1014574},
    op=c,
    ts_ms=1634898004110}, 
    valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, 
    timestamp=null, 
    headers=ConnectHeaders(headers=)
}

update

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset={ts_sec=1634898199, file=master.000007, pos=982196336, row=1, server_id=1, event=2}
} 
ConnectRecord
{
    topic='mysql_binlog_source.test.tbl_rule_config', 
    kafkaPartition=null, 
    key=Struct{id=4}, 
    keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, 
    value=Struct{before=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=0},after=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=1},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898199000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982196493,row=0,thread=1014574},op=u,ts_ms=1634898198166}, 
    valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, 
    timestamp=null, 
    headers=ConnectHeaders(headers=)
}
自定义反序列化器,将data数据转换为json格式
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
* Author LYM
* Description 自定义反序列化器,将data数据转换为json格式
* 原格式:SourceRecord{sourcePartition={server=mysql_binlog_source}, ...}ConnectRecord{topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, ...}
* 目标格式:{"database":"test","beforeData":{"sink_db":"db_test","source_db":"test"},"type":"update","table":"test","afterData":{"sink_db":"db_test","source_db":"test"}}
* Date 2021/10/18 17:25
* Version 1.0
*/
public class FlinkCdcDataDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

Struct valueStruct = (Struct)sourceRecord.value();
Struct sourceStruct = valueStruct.getStruct("source");

//获取数据库名称,表名,操作类型
String database = sourceStruct.getString("db");
String table = sourceStruct.getString("table");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();

if (type.equals("create")) type="insert";
JSONObject jsonObject = new JSONObject();
jsonObject.put("database",database);
jsonObject.put("table",table);
jsonObject.put("type",type);

//格式转换
Struct beforeStruct = valueStruct.getStruct("before");
JSONObject beforeDataJson = new JSONObject();
if (beforeStruct != null) {
for (Field field : beforeStruct.schema().fields()) {
beforeDataJson.put(field.name(),beforeStruct.get(field));
}
}

Struct afterStruct = valueStruct.getStruct("after");
JSONObject afterDataJson = new JSONObject();
if (afterStruct != null) {
for (Field field : afterStruct.schema().fields()) {
afterDataJson.put(field.name(),afterStruct.get(field));
}
}

jsonObject.put("beforeData",beforeDataJson);
jsonObject.put("afterData",afterDataJson);

//向下游传递数据
collector.collect(jsonObject.toJSONString());

}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM