最近想起剛做實時數倉時候的項目架構,如下:
從架構圖可以看到,數倉是基於 Flink 和 Kafka 的實時能力做的。
-
數據源是業務系統的數據庫和頁面上的埋點日志
-
Binlog 采集工具采集數據庫的日志,寫到 Kafka;Flume 接收埋點的 http 消息,將數據寫到 Kafka
-
Flink 讀取 Kafka 中的數據,清洗后寫入 ODS 層(Kafka),同時起一個 Flink 任務,將 ODS 的數據寫一份到 Hbase
-
Flink 讀取 ODS 中的數據,填充維度、多表 join 后寫入 DWD 層(Kafka),同時起一個 Flink 任務,將 DW 的數據寫一份到 Hbase
-
Flink 讀取 DW 中的數據,計算業務指標后寫入 ADS 層(Kafka),同時起一個 Flink 任務,將 DWD 的數據寫一份到 Hbase
-
Flink 讀取 ADS 中的數據,寫入 指標數據庫(MySQL、ES)
- 將每層數據寫一份到 Hbase 是該項目的特點,主要用於 流表關聯、離線分析數據源、數據備份
那時候就想弄個支持多路輸出的 Table Sink,一直沒有動手。現在先來個簡單的 SocketTableSink 練練手
之前搞 Table Source 的時候,已經實現了 Socke Table Source(從官網抄的),這次剛好一起湊成一組 Table Source/Sink
先來個架構圖:
類圖:
SocketDynamicTableFactory 實現 Flink 的 DynamicTableSourceFactory, DynamicTableSinkFactory
SocketDynamicTableSource 實現 Flink 的 ScanTableSource, ScanTableSource 實現了 DynamicTableSource
SocketDynamicTableSink 實現 Flink 的 DynamicTableSink
SocketSourceFunction 繼承 Flink 的 RichSourceFunction
SocketSinkFunction 繼承 RichSinkFunction
代碼
SocketDynamicTableFactory
定義了connector 標識,和 Socket table source/sink 必選、可選的參數,如: hostname、port 等
@Override
public String factoryIdentifier() {
return "socket"; // used for matching to `connector = '...'`
}
// define all options statically
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();
實現了 createDynamicTableSource、createDynamicTableSink 方法,解析 Catalog 中的參數,根據 format 類型選擇合適的序列化器或反序列化器,最后創建 SocketDynamicTableSource、SocketDynamicTableSink
/**
* create socket table source
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(MAX_RETRY);
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
/**
* create socket table sink
* @param context
* @return
*/
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final int maxRetry = options.get(MAX_RETRY);
final long retryInterval = options.get(RETRY_INTERVAL) * 1000;
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
Preconditions.checkState(maxRetry > 0, "max retry time max greater than 0, current : ", maxRetry);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
return new SocketDynamicTableSink(hostname, port, maxRetry, retryInterval, encodingFormat, byteDelimiter, producedDataType);
}
SocketDynamicTableSource
用 DecodingFormat 創建對應的反序列化器,創建 Source Function
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create runtime classes that are shipped to the cluster
// create deserializer
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
SocketSourceFunction
從 socket 中獲取輸入流,反序列化數據,輸出到下游
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
while (isRunning) {
// open and consume from socket
try (final Socket socket = new Socket()) {
currentSocket = socket;
socket.connect(new InetSocketAddress(hostname, port), 0);
try (InputStream stream = socket.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = stream.read()) >= 0) {
// buffer until delimiter
if (b != byteDelimiter) {
buffer.write(b);
}
// decode and emit record
else {
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
buffer.reset();
}
}
}
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
Thread.sleep(1000);
}
}
SocketDynamicTableSink
和 SocketDynamicTableSource 類似,用 EncodingFormat 創建序列化器,創建 SocketSinkFunction
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema<RowData> serializer = encodingFormat.createRuntimeEncoder(context, producedDataType);
SocketSinkFunction<RowData> sink = new SocketSinkFunction(hostname, port, serializer, maxRetry, retryInterval);
return SinkFunctionProvider.of(sink);
}
SocketSinkFunction
將數據序列化后,寫入到 socket 的輸出流
@Override
public void invoke(RowData element, Context context) throws Exception {
int retryTime = 0;
byte[] message = serializer.serialize(element);
while (retryTime <= maxRetry) {
try {
os.write(message);
os.flush();
return;
} catch (Exception e) {
Thread.sleep(retryInterval);
++retryTime;
reconnect();
// LOG.warn("send data error retry: {}", retryTime);
}
}
LOG.warn("send error after retry {} times, ignore it: {}", maxRetry, new String(message));
}
測試
SQL 讀 socket 寫 socket:
-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ts TIMESTAMP(3)
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket'
,'hostname' = 'thinkpad'
,'port' = '12345'
,'format' = 'json'
-- ,'format' = 'csv'
);
-- set table.sql-dialect=hive;
-- kafka sink
drop table if exists socket_sink;
CREATE TABLE socket_sink (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts timestamp(3)
) WITH (
'connector' = 'socket'
,'hostname' = 'localhost'
,'max.retry' = '2'
-- ,'retry.interval' = '2'
,'port' = '12346'
,'format' = 'csv'
);
-- streaming sql, insert into mysql table
insert into socket_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;
先打開兩個端口:
nc -lk localhost 12345
nc -lk localhost 12346
輸入數據:
輸出:
- 注: 要先打開輸入、輸出的端口,不然連不上端口,source 和sink 都要報錯
完整示例參考: github sqlSubmit
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文