flink寫入clickhouse之單表寫入


flink寫入clickhouse之單表寫入

簡介

flink有一個標准的jdbc sink,提供批量,定時的提交方法。

參考flink文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/

同時,如果設置了checkpoint,在做checkpoint時候會進行一次提交。

基於這點,我們可以將jdbc sink的提交時間和數量設置的很大(即一次checkpoint間隔內達不到的標准),然后通過checkpoint時候進行的提交,來達到精確一次的效果。

關於寫clickhouse,我們采用官方的包,是基於https的,適用於批量提交。

clickhouse的表有單表和分布式表之分,我們先進行單表的寫入,即對着一個節點寫入。

寫入clickhouse單表

引入依賴

    <!--   jdbc sink-->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
    <!-- jdbc  clickhouse -->
	<dependency>
		<groupId>ru.yandex.clickhouse</groupId>
		<artifactId>clickhouse-jdbc</artifactId>
		<version>0.3.1</version>
	</dependency>

jdbc sink一般使用方式

JdbcSink.sink(
                "insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    /**
     * 用於設置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement實例
     * @param fields 通過”實例對象.getClass().getDeclaredFields()“獲得
     * @param bean   實例對象
     * @throws IllegalAccessException field.get拋出的錯誤
     * @throws SQLException           ps.set拋出的錯誤
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }

注1:其中第一個參數是sql語句,格式必須嚴格按照例子中的格式,列舉出列名,后面以 ? 填充。因為后面會調用 JdbcStatementBuilder 讀取每條數據來對該sql進行補全。
注2:其中JdbcStatementBuilder的實現,DwdOrderBean是對應clickhouse表結構創建的實例對象,后續通過對實例對象的屬性循環傳遞,來設置到PreparedStatement中,如果列數很少,可以手動填寫。

包裝下常用設置

public class SinkSingleClickHouse<T> {
    private final static String NA = "null";
    private final SinkFunction<T> sink;

    /**
     * 獲取clickhouse sinkFunction
     *
     * @param sql                  插入語句,格式必須為  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用單條信息填充sql
     * @param runEnv               執行環境
     * @param database             表所在的數據庫
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    /**
     * 獲取clickhouse sinkFunction
     *
     * @param sql                  插入語句,格式必須為  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用單條信息填充sql
     * @param runEnv               執行環境
     * @param database             表所在的數據庫
     * @param batchIntervalMs      提交條件之:間隔
     * @param batchSize            提交條件之:數據量
     * @param maxRetries           提交重試次數
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database,
                                int batchIntervalMs, int batchSize, int maxRetries) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(batchIntervalMs)
                        .withBatchSize(batchSize)
                        .withMaxRetries(maxRetries)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    public SinkFunction<T> getSink() {
        return sink;
    }

    /**
     * 用於設置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement實例
     * @param fields 通過”實例對象.getClass().getDeclaredFields()“獲得
     * @param bean   實例對象
     * @throws IllegalAccessException field.get拋出的錯誤
     * @throws SQLException           ps.set拋出的錯誤
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }
}

注1:其中JdbcExecutionOptions,即使我們設置批量和定時的地方,如果不傳,會有默認值。

最終使用

SinkFunction<DwdOrderBean> sinkClickhouse = new SinkSingleClickHouse<>("insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                uat,
                "dwd_cdp")
                .getSink();

最終如上,我們即可得到一個對着單表寫入的sink


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM