大致思路:
canal去mysql拉取數據,放在canal所在的節點上,並且自身對外提供一個tcp服務,我們只要寫一個連接該服務的客戶端,去拉取數據並且指定往kafka寫數據的格式就能達到以protobuf的格式往kafka中寫數據的要求。
1. 配置canal(/bigdata/canal/conf/canal.properties),然后啟動canal,這樣就會開啟一個tcp服務


2. 寫拉取數據的客戶端代碼
PbOfCanalToKafka
package cn._51doit.flink.canal; import cn._51doit.proto.OrderDetailProto; import com.alibaba.google.common.base.CaseFormat; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class PbOfCanalToKafka { public static void main(String[] args) throws Exception { CanalConnector canalConnector = CanalConnectors.newSingleConnector((new InetSocketAddress("192.168.57.12", 11111)), "example", "canal", "canal123"); // 1 配置參數 Properties props = new Properties(); //連接kafka節點 props.setProperty("bootstrap.servers", "feng05:9092,feng06:9092,feng07:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); while (true) { //建立連接 canalConnector.connect(); //訂閱bigdata數據庫下的所有表 canalConnector.subscribe("doit.orderdetail"); //每100毫秒拉取一次數據 Message message = canalConnector.get(10); if (message.getEntries().size() > 0) { // System.out.println(message); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { //獲取表名 String tableName = entry.getHeader().getTableName(); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // System.out.println(rowDatasList); //判斷對數據庫操作的類型,這里只采集INSERT/update的數據 OrderDetailProto.OrderDetail.Builder bean = OrderDetailProto.OrderDetail.newBuilder(); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE) { for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); System.out.println("======================打印afterColumnsList=============================="); System.out.println(afterColumnsList); Map<String, String> kv = new HashMap<String, String>(); for (CanalEntry.Column column : afterColumnsList) { String propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName()); kv.put(propertyName, column.getValue()); } // 設置屬性 bean.setAmount(Integer.parseInt(kv.get("amount"))); bean.setMoney(Double.parseDouble(kv.get("money"))); bean.setOrderId(Long.parseLong(kv.get("orderId"))); bean.setCreateTime(kv.get("createTime")); bean.setUpdateTime(kv.get("updateTime")); bean.setId(Integer.parseInt(kv.get("id"))); bean.setSku(Long.parseLong(kv.get("sku"))); bean.setCategoryId(Integer.parseInt(kv.get("categoryId"))); //將數據轉成JSON格式,然后用Kafka的Producer發送出去 byte[] bytes = bean.build().toByteArray(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(tableName, bytes); producer.send(record); } } } } } } }
注意:數據被拉取到canal的格式不為json(若是不開啟tcp服務,直接將數據發送給kafka,則數據在kafka中的格式為json),OrderDetailProto的生成見flink實時項目day07
Message
Message[id=1,entries=[header {
version: 1
logfileName: "mysql-bin.000002"
logfileOffset: 6669
serverId: 1
serverenCode: "UTF-8"
executeTime: 1594134782000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\0042179"
, header {
version: 1
logfileName: "mysql-bin.000002"
logfileOffset: 6765
serverId: 1
serverenCode: "UTF-8"
executeTime: 1594147469000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: " A"
, header {
version: 1
logfileName: "mysql-bin.000002"
logfileOffset: 6911
serverId: 1
serverenCode: "UTF-8"
executeTime: 1594147469000
sourceType: MYSQL
schemaName: "doit"
tableName: "orderdetail"
eventLength: 82
eventType: INSERT
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\b\177\020\001P\000b\332\002\022\'\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\00212R\nbigint(20)\0220\b\001\020\373\377\377\377\377\377\377\377\377\001\032\border_id \000(\0010\000B\00529002R\nbigint(20)\022#\b\002\020\004\032\vcategory_id \000(\0010\000B\0012R\aint(11)\022#\b\003\020\f\032\003sku \000(\0010\000B\00520001R\vvarchar(50)\022!\b\004\020\b\032\005money \000(\0010\000B\0062000.0R\006double\022\036\b\005\020\004\032\006amount \000(\0010\000B\0012R\aint(11)\0227\b\006\020]\032\vcreate_time \000(\0010\000B\0232020-07-01 20:19:08R\ttimestamp\0227\b\a\020]\032\vupdate_time \000(\0010\000B\0232020-07-02 20:19:13R\ttimestamp"
, header {
version: 1
logfileName: "mysql-bin.000002"
logfileOffset: 6993
serverId: 1
serverenCode: "UTF-8"
executeTime: 1594147469000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\0042197"
],raw=false,rawEntries=[]]
在mysql表orderdetail表中添加了一行

rowDatasList
[afterColumns {
index: 0
sqlType: -5
name: "id"
isKey: true
updated: true
isNull: false
value: "13"
mysqlType: "bigint(20)"
}
afterColumns {
index: 1
sqlType: -5
name: "order_id"
isKey: false
updated: true
isNull: false
value: "29002"
mysqlType: "bigint(20)"
}
afterColumns {
index: 2
sqlType: 4
name: "category_id"
isKey: false
updated: true
isNull: false
value: "3"
mysqlType: "int(11)"
}
afterColumns {
index: 3
sqlType: 12
name: "sku"
isKey: false
updated: true
isNull: false
value: "22333"
mysqlType: "varchar(50)"
}
afterColumns {
index: 4
sqlType: 8
name: "money"
isKey: false
updated: true
isNull: false
value: "1111.0"
mysqlType: "double"
}
afterColumns {
index: 5
sqlType: 4
name: "amount"
isKey: false
updated: true
isNull: false
value: "3"
mysqlType: "int(11)"
}
afterColumns {
index: 6
sqlType: 93
name: "create_time"
isKey: false
updated: true
isNull: false
value: "2020-07-01 22:02:50"
mysqlType: "timestamp"
}
afterColumns {
index: 7
sqlType: 93
name: "update_time"
isKey: false
updated: true
isNull: false
value: "2020-07-02 22:02:54"
mysqlType: "timestamp"
}
]
afterColumnsList
[index: 0
sqlType: -5
name: "id"
isKey: true
updated: false
isNull: false
value: "12"
mysqlType: "bigint(20)"
, index: 1
sqlType: -5
name: "order_id"
isKey: false
updated: false
isNull: false
value: "29002"
mysqlType: "bigint(20)"
, index: 2
sqlType: 4
name: "category_id"
isKey: false
updated: false
isNull: false
value: "2"
mysqlType: "int(11)"
, index: 3
sqlType: 12
name: "sku"
isKey: false
updated: true
isNull: false
value: "20011"
mysqlType: "varchar(50)"
, index: 4
sqlType: 8
name: "money"
isKey: false
updated: false
isNull: false
value: "2000.0"
mysqlType: "double"
, index: 5
sqlType: 4
name: "amount"
isKey: false
updated: false
isNull: false
value: "2"
mysqlType: "int(11)"
, index: 6
sqlType: 93
name: "create_time"
isKey: false
updated: false
isNull: false
value: "2020-07-01 20:19:08"
mysqlType: "timestamp"
, index: 7
sqlType: 93
name: "update_time"
isKey: false
updated: false
isNull: false
value: "2020-07-02 20:19:13"
mysqlType: "timestamp"
]
3. 若是想從kafka中讀取protobuf格式的數據,則需要自定義序列化器,這里以flink讀取蓋格師的數據為例
具體見flink實時項目day07
