Flink Socket table source/sink


最近想起刚做实时数仓时候的项目架构,如下:

从架构图可以看到,数仓是基于 Flink 和 Kafka 的实时能力做的。

  1. 数据源是业务系统的数据库和页面上的埋点日志

  2. Binlog 采集工具采集数据库的日志,写到 Kafka;Flume 接收埋点的 http 消息,将数据写到 Kafka

  3. Flink 读取 Kafka 中的数据,清洗后写入 ODS 层(Kafka),同时起一个 Flink 任务,将 ODS 的数据写一份到 Hbase

  4. Flink 读取 ODS 中的数据,填充维度、多表 join 后写入 DWD 层(Kafka),同时起一个 Flink 任务,将 DW 的数据写一份到 Hbase

  5. Flink 读取 DW 中的数据,计算业务指标后写入 ADS 层(Kafka),同时起一个 Flink 任务,将 DWD 的数据写一份到 Hbase

  6. Flink 读取 ADS 中的数据,写入 指标数据库(MySQL、ES)

  • 将每层数据写一份到 Hbase 是该项目的特点,主要用于 流表关联、离线分析数据源、数据备份

那时候就想弄个支持多路输出的 Table Sink,一直没有动手。现在先来个简单的 SocketTableSink 练练手

之前搞 Table Source 的时候,已经实现了 Socke Table Source(从官网抄的),这次刚好一起凑成一组 Table Source/Sink

先来个架构图:

类图:

SocketDynamicTableFactory 实现 Flink 的 DynamicTableSourceFactory, DynamicTableSinkFactory

SocketDynamicTableSource 实现 Flink 的 ScanTableSource, ScanTableSource 实现了 DynamicTableSource

SocketDynamicTableSink 实现 Flink 的 DynamicTableSink

SocketSourceFunction 继承 Flink 的 RichSourceFunction

SocketSinkFunction 继承 RichSinkFunction

代码

SocketDynamicTableFactory

定义了connector 标识,和 Socket table source/sink 必选、可选的参数,如: hostname、port 等

@Override
public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
}

// define all options statically
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
        .stringType()
        .noDefaultValue();

public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
        .intType()
        .noDefaultValue();

实现了 createDynamicTableSource、createDynamicTableSink 方法,解析 Catalog 中的参数,根据 format 类型选择合适的序列化器或反序列化器,最后创建 SocketDynamicTableSource、SocketDynamicTableSink

/**
 * create socket table source
 * @param context
 * @return
 */
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
            DeserializationFormatFactory.class,
            FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(MAX_RETRY);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType =
            context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}


/**
 * create socket table sink
 * @param context
 * @return
 */
@Override
public DynamicTableSink createDynamicTableSink(Context context) {

    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
            SerializationFormatFactory.class,
            FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final int maxRetry = options.get(MAX_RETRY);
    final long retryInterval = options.get(RETRY_INTERVAL) * 1000;
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    Preconditions.checkState(maxRetry > 0, "max retry time max greater than 0, current : ", maxRetry);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType =
            context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

    return new SocketDynamicTableSink(hostname, port, maxRetry, retryInterval, encodingFormat, byteDelimiter, producedDataType);
}

SocketDynamicTableSource

用 DecodingFormat 创建对应的反序列化器,创建 Source Function


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

	// create runtime classes that are shipped to the cluster
	// create deserializer
	final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
	  runtimeProviderContext,
	  producedDataType);

	final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname, port, byteDelimiter, deserializer);

	return SourceFunctionProvider.of(sourceFunction, false);
}

SocketSourceFunction

从 socket 中获取输入流,反序列化数据,输出到下游


@Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }

SocketDynamicTableSink

和 SocketDynamicTableSource 类似,用 EncodingFormat 创建序列化器,创建 SocketSinkFunction


@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {


    final SerializationSchema<RowData> serializer = encodingFormat.createRuntimeEncoder(context, producedDataType);

    SocketSinkFunction<RowData> sink = new SocketSinkFunction(hostname, port, serializer, maxRetry, retryInterval);
    return SinkFunctionProvider.of(sink);
}

SocketSinkFunction

将数据序列化后,写入到 socket 的输出流


@Override
public void invoke(RowData element, Context context) throws Exception {

    int retryTime = 0;
    byte[] message = serializer.serialize(element);
    while (retryTime <= maxRetry) {
        try {
            os.write(message);
            os.flush();
            return;
        } catch (Exception e) {
            Thread.sleep(retryInterval);
            ++retryTime;
            reconnect();
//                LOG.warn("send data error retry: {}", retryTime);
        }
    }
    LOG.warn("send error after retry {} times, ignore it: {}", maxRetry, new String(message));
}

测试

SQL 读 socket 写 socket:

-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
   'connector' = 'socket'
  ,'hostname' = 'thinkpad'
  ,'port' = '12345'
  ,'format' = 'json'
--   ,'format' = 'csv'
);

-- set table.sql-dialect=hive;
-- kafka sink
drop table if exists socket_sink;
CREATE TABLE socket_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts timestamp(3)
) WITH (
  'connector' = 'socket'
  ,'hostname' = 'localhost'
  ,'max.retry' = '2'
--   ,'retry.interval' = '2'
  ,'port' = '12346'
  ,'format' = 'csv'
);


-- streaming sql, insert into mysql table
insert into socket_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

先打开两个端口:

nc -lk localhost 12345
nc -lk localhost 12346

输入数据:

输出:

  • 注: 要先打开输入、输出的端口,不然连不上端口,source 和sink 都要报错

完整示例参考: github sqlSubmit

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM