Flink Socket table source/sink


最近想起剛做實時數倉時候的項目架構,如下:

從架構圖可以看到,數倉是基於 Flink 和 Kafka 的實時能力做的。

  1. 數據源是業務系統的數據庫和頁面上的埋點日志

  2. Binlog 采集工具采集數據庫的日志,寫到 Kafka;Flume 接收埋點的 http 消息,將數據寫到 Kafka

  3. Flink 讀取 Kafka 中的數據,清洗后寫入 ODS 層(Kafka),同時起一個 Flink 任務,將 ODS 的數據寫一份到 Hbase

  4. Flink 讀取 ODS 中的數據,填充維度、多表 join 后寫入 DWD 層(Kafka),同時起一個 Flink 任務,將 DW 的數據寫一份到 Hbase

  5. Flink 讀取 DW 中的數據,計算業務指標后寫入 ADS 層(Kafka),同時起一個 Flink 任務,將 DWD 的數據寫一份到 Hbase

  6. Flink 讀取 ADS 中的數據,寫入 指標數據庫(MySQL、ES)

  • 將每層數據寫一份到 Hbase 是該項目的特點,主要用於 流表關聯、離線分析數據源、數據備份

那時候就想弄個支持多路輸出的 Table Sink,一直沒有動手。現在先來個簡單的 SocketTableSink 練練手

之前搞 Table Source 的時候,已經實現了 Socke Table Source(從官網抄的),這次剛好一起湊成一組 Table Source/Sink

先來個架構圖:

類圖:

SocketDynamicTableFactory 實現 Flink 的 DynamicTableSourceFactory, DynamicTableSinkFactory

SocketDynamicTableSource 實現 Flink 的 ScanTableSource, ScanTableSource 實現了 DynamicTableSource

SocketDynamicTableSink 實現 Flink 的 DynamicTableSink

SocketSourceFunction 繼承 Flink 的 RichSourceFunction

SocketSinkFunction 繼承 RichSinkFunction

代碼

SocketDynamicTableFactory

定義了connector 標識,和 Socket table source/sink 必選、可選的參數,如: hostname、port 等

@Override
public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
}

// define all options statically
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
        .stringType()
        .noDefaultValue();

public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
        .intType()
        .noDefaultValue();

實現了 createDynamicTableSource、createDynamicTableSink 方法,解析 Catalog 中的參數,根據 format 類型選擇合適的序列化器或反序列化器,最后創建 SocketDynamicTableSource、SocketDynamicTableSink

/**
 * create socket table source
 * @param context
 * @return
 */
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
            DeserializationFormatFactory.class,
            FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(MAX_RETRY);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType =
            context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}


/**
 * create socket table sink
 * @param context
 * @return
 */
@Override
public DynamicTableSink createDynamicTableSink(Context context) {

    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
            SerializationFormatFactory.class,
            FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final int maxRetry = options.get(MAX_RETRY);
    final long retryInterval = options.get(RETRY_INTERVAL) * 1000;
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    Preconditions.checkState(maxRetry > 0, "max retry time max greater than 0, current : ", maxRetry);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType =
            context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

    return new SocketDynamicTableSink(hostname, port, maxRetry, retryInterval, encodingFormat, byteDelimiter, producedDataType);
}

SocketDynamicTableSource

用 DecodingFormat 創建對應的反序列化器,創建 Source Function


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

	// create runtime classes that are shipped to the cluster
	// create deserializer
	final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
	  runtimeProviderContext,
	  producedDataType);

	final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);

	return SourceFunctionProvider.of(sourceFunction, false);
}

SocketSourceFunction

從 socket 中獲取輸入流,反序列化數據,輸出到下游


@Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }

SocketDynamicTableSink

和 SocketDynamicTableSource 類似,用 EncodingFormat 創建序列化器,創建 SocketSinkFunction


@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {


    final SerializationSchema<RowData> serializer = encodingFormat.createRuntimeEncoder(context, producedDataType);

    SocketSinkFunction<RowData> sink = new SocketSinkFunction(hostname, port, serializer, maxRetry, retryInterval);
    return SinkFunctionProvider.of(sink);
}

SocketSinkFunction

將數據序列化后,寫入到 socket 的輸出流


@Override
public void invoke(RowData element, Context context) throws Exception {

    int retryTime = 0;
    byte[] message = serializer.serialize(element);
    while (retryTime <= maxRetry) {
        try {
            os.write(message);
            os.flush();
            return;
        } catch (Exception e) {
            Thread.sleep(retryInterval);
            ++retryTime;
            reconnect();
//                LOG.warn("send data error retry: {}", retryTime);
        }
    }
    LOG.warn("send error after retry {} times, ignore it: {}", maxRetry, new String(message));
}

測試

SQL 讀 socket 寫 socket:

-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
   'connector' = 'socket'
  ,'hostname' = 'thinkpad'
  ,'port' = '12345'
  ,'format' = 'json'
--   ,'format' = 'csv'
);

-- set table.sql-dialect=hive;
-- kafka sink
drop table if exists socket_sink;
CREATE TABLE socket_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts timestamp(3)
) WITH (
  'connector' = 'socket'
  ,'hostname' = 'localhost'
  ,'max.retry' = '2'
--   ,'retry.interval' = '2'
  ,'port' = '12346'
  ,'format' = 'csv'
);


-- streaming sql, insert into mysql table
insert into socket_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

先打開兩個端口:

nc -lk localhost 12345
nc -lk localhost 12346

輸入數據:

輸出:

  • 注: 要先打開輸入、輸出的端口,不然連不上端口,source 和sink 都要報錯

完整示例參考: github sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM