Flink 自定義 Http Table Source


寫了一個 Http 的 Table Source

參考官網: [用戶定義源和數據匯](https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sourcessinks/)

Flink Table 連接器結構:

 

自定義需要實現如下內容:

  • 1. 實現 Runtime 的 SourceFunction
  • 2. 實現 Planner 的 TableSourceFactory 和 TableSource

先看一下最后實現了的 Table Schema

create table cust_http_source(
    id string
    ,name string
    ,sex string
)WITH(
 'connector' = 'http'
 ,'http.url' = 'http://localhost:8888'
 ,'http.interval' = '1000'
 ,'format' = 'csv'
)

## 1. 定義 SourceFunction

在網上找了一個發送 Http 請求的 Demo, 稍微改了一點,將 url 改成傳入參數,獲取 httpServer 返回的數據

public class HttpClientUtil {

    public static String doGet(String httpurl) throws IOException {
        HttpURLConnection connection = null;
        InputStream is = null;
        BufferedReader br = null;
        // 返回結果字符串
        String result = null;
        try {
            // 創建遠程url連接對象
            URL url = new URL(httpurl);
            // 通過遠程url連接對象打開一個連接,強轉成httpURLConnection類
            connection = (HttpURLConnection) url.openConnection();
            // 設置連接方式:get
            connection.setRequestMethod("GET");
            // 設置連接主機服務器的超時時間:15000毫秒
            connection.setConnectTimeout(15000);
            // 設置讀取遠程返回的數據時間:60000毫秒
            connection.setReadTimeout(60000);
            // 發送請求
            connection.connect();
            // 通過connection連接,獲取輸入流
            if (connection.getResponseCode() == 200) {
                is = connection.getInputStream();
                // 封裝輸入流is,並指定字符集
                br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

                // 存放數據
                StringBuffer sbf = new StringBuffer();
                String temp = null;
                while ((temp = br.readLine()) != null) {
                    sbf.append(temp);
                    sbf.append("\r\n");
                }
                result = sbf.toString();
            }
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 關閉資源
            if (null != br) {
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (null != is) {
                try {
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            connection.disconnect();
        }
        return result;
    }
}

* 非常抱歉不知道是從哪位大佬的博客里面復制的,時間有點久了,找不到來源了

SourceFunction 就很簡單了,集成 RichSourceFunction,實現方法即可,接收 table properties 的屬性,Format 直接用 Flink 現有的,所以加上了反序列化器

public class HttpSource extends RichSourceFunction<RowData> {

    private volatile boolean isRunning = true;
    private String url;
    private long requestInterval;
    private DeserializationSchema<RowData> deserializer;
    // count out event
    private transient Counter counter;

    public HttpSource(String url, long requestInterval, DeserializationSchema<RowData> deserializer) {
        this.url = url;
        this.requestInterval = requestInterval;
        this.deserializer = deserializer;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        counter = new SimpleCounter();
        this.counter = getRuntimeContext()
                .getMetricGroup()
                .counter("myCounter");
    }

    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
        while (isRunning) {
            try {
                // receive http message, csv format
                String message = HttpClientUtil.doGet(url);
                // deserializer csv message
 ctx.collect(deserializer.deserialize(message.getBytes())); this.counter.inc();

                Thread.sleep(requestInterval);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

接收 table properties 中 format 格式的數據,序列號成 RowData 類型,從 SourceFunction 輸出

## 2. 定義 TableSource

HttpDynamicTableSource 實現 ScanTableSource,接收 table properties 的屬性,從 format 創建匹配的 反序列化器,創建 HttpSource

public class HttpDynamicTableSource implements ScanTableSource {

    private final String url;
    private final long interval;
    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    private final DataType producedDataType;

    public HttpDynamicTableSource(
            String hostname,
            long interval,
            DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
            DataType producedDataType) {
        this.url = hostname;
        this.interval = interval;
        this.decodingFormat = decodingFormat;
        this.producedDataType = producedDataType;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        // in our example the format decides about the changelog mode
        // but it could also be the source itself
        return decodingFormat.getChangelogMode();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        // create runtime classes that are shipped to the cluster
        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder( runtimeProviderContext, producedDataType); final SourceFunction<RowData> sourceFunction = new HttpSource(url, interval, deserializer);

        return SourceFunctionProvider.of(sourceFunction, false);
    }

    @Override
    public DynamicTableSource copy() {
        return new HttpDynamicTableSource(url, interval, decodingFormat, producedDataType);
    }

    @Override
    public String asSummaryString() {
        return "Http Table Source";
    }
}

## 3. 定義 TableSourceFactory

實現 DynamicTableSourceFactory 接口,添加必填屬性 http.url 和 http.interval 的 ConfigOption, 創建 HttpDynamicTableSource

public class HttpDynamicTableFactory implements DynamicTableSourceFactory {

    // define all options statically
    public static final ConfigOption<String> URL = ConfigOptions.key("http.url")
            .stringType()
            .noDefaultValue();

    public static final ConfigOption<Long> INTERVAL = ConfigOptions.key("http.interval")
            .longType()
            .noDefaultValue();

    @Override
    public String factoryIdentifier() {
        return "http"; // used for matching to `connector = '...'`
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(URL);
        options.add(INTERVAL);
        options.add(FactoryUtil.FORMAT); // use pre-defined option for format
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        // no optional option
//        options.add(BYTE_DELIMITER);
        return options;
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        // either implement your custom validation logic here ...
        // or use the provided helper utility
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        // discover a suitable decoding format
        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT); // validate all options
        helper.validate();

        // get the validated options
        final ReadableConfig options = helper.getOptions();
        final String url = options.get(URL);
        final long interval = options.get(INTERVAL);

        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType producedDataType =
                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

        // create and return dynamic table source
        return new HttpDynamicTableSource(url, interval, decodingFormat, producedDataType);
    }

默認情況下,Flink 使用 Java 的服務提供者接口 (SPI)發現 TableSourceFactory 的實例,所以需要在 META-INF/services/org.apache.flink.table.factories.Factory 中添加 HttpDynamicTableFactory 的全限定類名

com.rookie.submit.cust.source.socket.SocketDynamicTableFactory

## 4. 測試

完整 sql 如下:

create table cust_http_source(
    id string
    ,name string
    ,sex string
)WITH(
 'connector' = 'http'
 ,'http.url' = 'http://localhost:8888'
 ,'http.interval' = '1000'
 ,'format' = 'csv'
)
;

create table cust_http_sink(
id string
,name string
,sex string
)WITH(
    'connector' = 'print'
)
;

insert into cust_http_sink
select id,name,sex
from cust_http_source;

 Http Server ,接收 http 請求,返回拼接的字符串:

/**
 * 創建 http server 監控端口請求
 */
public class HttpServer {

    public static void main(String[] arg) throws Exception {

        com.sun.net.httpserver.HttpServer server = com.sun.net.httpserver.HttpServer.create(new InetSocketAddress(8888), 10);
        server.createContext("/", new TestHandler());
        server.start();
    }

    static class TestHandler implements HttpHandler {
        public void handle(HttpExchange exchange) throws IOException {
            String response = "hello world";

            try {
                //獲得表單提交數據(post)
                String postString = IOUtils.toString(exchange.getRequestBody());

                exchange.sendResponseHeaders(200, 0);
                OutputStream os = exchange.getResponseBody();
                String result = UUID.randomUUID().toString();
                result = System.currentTimeMillis() + ",name," + result;
                os.write(result.getBytes());
                os.close();
            } catch (IOException ie) {
                ie.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

啟動任務:

 

接收到的數據:

+I[1633921534798, name, ce1738aa-42e4-4cad-b29a-a011db7cd91a]
+I[1633921535813, name, e3b9e51a-f6f4-410e-b2eb-5353b2c1b294]
+I[1633921536816, name, f0dd1f7d-d7c5-4520-a147-3db8c8d5d153]
+I[1633921537818, name, 4b5461be-b979-48cb-ae3e-375568bfbf06]
+I[1633921538820, name, 8c2a80e0-39f8-4f6b-b573-885d1109ac3a]
+I[1633921539823, name, 3b324fa9-d6a6-4156-ab0a-888ee3fe02ce]
+I[1633921540826, name, e6247826-8e54-40a4-8571-1d3b43419211]

搞定

* 注: http Table source 參考官網: [socket table source](https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sourcessinks/#full-stack-example)

* 注: http server 不能掛

完整案例參考 GitHub:  https://github.com/springMoon/sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM