flink寫入clickhouse之單表寫入
簡介
flink有一個標准的jdbc sink,提供批量,定時的提交方法。
參考flink文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/
同時,如果設置了checkpoint,在做checkpoint時候會進行一次提交。
基於這點,我們可以將jdbc sink的提交時間和數量設置的很大(即一次checkpoint間隔內達不到的標准),然后通過checkpoint時候進行的提交,來達到精確一次的效果。
關於寫clickhouse,我們采用官方的包,是基於https的,適用於批量提交。
clickhouse的表有單表和分布式表之分,我們先進行單表的寫入,即對着一個節點寫入。
寫入clickhouse單表
引入依賴
<!-- jdbc sink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- jdbc clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
jdbc sink一般使用方式
JdbcSink.sink(
"insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
/**
* 用於設置clickhouse PreparedStatement的通用方法
*
* @param ps PreparedStatement實例
* @param fields 通過”實例對象.getClass().getDeclaredFields()“獲得
* @param bean 實例對象
* @throws IllegalAccessException field.get拋出的錯誤
* @throws SQLException ps.set拋出的錯誤
*/
public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
for (int i = 1; i <= fields.length; i++) {
Field field = fields[i - 1];
field.setAccessible(true);
Object o = field.get(bean);
if (o == null) {
ps.setNull(i, 0);
continue;
}
String fieldValue = o.toString();
if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
ps.setObject(i, fieldValue);
} else {
ps.setNull(i, 0);
}
}
}
注1:其中第一個參數是sql語句,格式必須嚴格按照例子中的格式,列舉出列名,后面以 ? 填充。因為后面會調用 JdbcStatementBuilder 讀取每條數據來對該sql進行補全。
注2:其中JdbcStatementBuilder的實現,DwdOrderBean是對應clickhouse表結構創建的實例對象,后續通過對實例對象的屬性循環傳遞,來設置到PreparedStatement中,如果列數很少,可以手動填寫。
包裝下常用設置
public class SinkSingleClickHouse<T> {
private final static String NA = "null";
private final SinkFunction<T> sink;
/**
* 獲取clickhouse sinkFunction
*
* @param sql 插入語句,格式必須為 inert into table a,b values (?,?)
* @param jdbcStatementBuilder 如何用單條信息填充sql
* @param runEnv 執行環境
* @param database 表所在的數據庫
*/
public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
RunEnv runEnv, String database) {
sink = JdbcSink.sink(
sql,
jdbcStatementBuilder,
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
}
/**
* 獲取clickhouse sinkFunction
*
* @param sql 插入語句,格式必須為 inert into table a,b values (?,?)
* @param jdbcStatementBuilder 如何用單條信息填充sql
* @param runEnv 執行環境
* @param database 表所在的數據庫
* @param batchIntervalMs 提交條件之:間隔
* @param batchSize 提交條件之:數據量
* @param maxRetries 提交重試次數
*/
public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
RunEnv runEnv, String database,
int batchIntervalMs, int batchSize, int maxRetries) {
sink = JdbcSink.sink(
sql,
jdbcStatementBuilder,
JdbcExecutionOptions.builder()
.withBatchIntervalMs(batchIntervalMs)
.withBatchSize(batchSize)
.withMaxRetries(maxRetries)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
}
public SinkFunction<T> getSink() {
return sink;
}
/**
* 用於設置clickhouse PreparedStatement的通用方法
*
* @param ps PreparedStatement實例
* @param fields 通過”實例對象.getClass().getDeclaredFields()“獲得
* @param bean 實例對象
* @throws IllegalAccessException field.get拋出的錯誤
* @throws SQLException ps.set拋出的錯誤
*/
public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
for (int i = 1; i <= fields.length; i++) {
Field field = fields[i - 1];
field.setAccessible(true);
Object o = field.get(bean);
if (o == null) {
ps.setNull(i, 0);
continue;
}
String fieldValue = o.toString();
if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
ps.setObject(i, fieldValue);
} else {
ps.setNull(i, 0);
}
}
}
}
注1:其中JdbcExecutionOptions,即使我們設置批量和定時的地方,如果不傳,會有默認值。
最終使用
SinkFunction<DwdOrderBean> sinkClickhouse = new SinkSingleClickHouse<>("insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
uat,
"dwd_cdp")
.getSink();
最終如上,我們即可得到一個對着單表寫入的sink
