flink寫入clickhouse之分布式表寫入.md
簡介
之前基於clickhouse的官方jdbc包編寫了sink,用於寫入單表,見:https://www.cnblogs.com/sqhhh/p/15897275.html
clickhouse分布式表的寫入,目前有2種方法:
- 1.對着邏輯表寫入:此方法可以當作是單表,利用單表寫入的sink寫入數據
- 2.對着各個節點的物理表寫入:網絡上都推薦這種寫入方式。具體見百度。
關於實現2的方案是:基於單表寫入的方式,建立寫入各個物理表的sink,然后統籌起來做成一個sink,再用輪詢或者hash等方式來分流數據到各個單表sink種。
實現思路
觀察原jdbc實現邏輯
1.原jdbc底層的使用是
JdbcSink.sink();
這是一個構造方法,本質上是實現了GenericJdbcSinkFunction的實例構造,代碼如下:
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private final AbstractJdbcOutputFormat<T> outputFormat;
public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat<T> outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {
outputFormat.writeRecord(value);
}
@Override
public void initializeState(FunctionInitializationContext context) {}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
@Override
public void close() {
outputFormat.close();
}
}
注:其中進行 checkpoint 時會調用 snapshotState(),這正是之前提到的利用checkpoint來進行提交的實現。
我們對源碼進行如下改造:
public class ClickHouseSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private final List<AbstractJdbcOutputFormat<T>> outputFormatList;
private final ShuntValue<T> shuntValueImp;
private final int size;
public ClickHouseSinkFunction(List<AbstractJdbcOutputFormat<T>> outputFormatList, ShuntValue<T> shuntValueImp) {
this.outputFormatList = outputFormatList;
this.shuntValueImp = shuntValueImp;
this.size = outputFormatList.size();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
}
@Override
public void invoke(T value, Context context) throws Exception {
int x = Math.abs(shuntValueImp.shunt(value) % size);
outputFormatList.get(x).writeRecord(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
outputFormat.flush();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
將單個AbstractJdbcOutputFormat拓展成了一個list,並利用int x = Math.abs(shuntValueImp.shunt(value) % size) 來進行分流。
其中,shuntValueImp是個接口,實現如下:
public interface ShuntValue<T> extends Serializable {
/**
* 如何划分record的
* @param value 單條記錄
* @return 通過該記錄返回一個int,這個數值將會對集群節點數取模,用於分流到不同節點
*/
int shunt(T value);
}
這個接口的作用是如何從單條數據中提取一個int數值,用於做分流用。
包裝下常用設置
public class ClickHouseSinkBuilder<T> {
private final String sql;
private final JdbcStatementBuilder<T> statementBuilder;
private JdbcExecutionOptions executionOptions = JdbcExecutionOptions.defaults();
private List<String> clickHouseHosts;
private String clickHousePort="8123";
private String clickHouseUser="default";
private String clickHousePassword="default";
private String clickHouseDatabase="default";
private ShuntValue<T> shuntValue=Object::hashCode;
private SinkFunction<T> buildAll(String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
List<String> clickHouseHosts,
String clickHousePort,
String clickHouseUser,
String clickHousePassword,
String clickHouseDatabase,
ShuntValue<T> shuntValue) {
List<JdbcConnectionOptions> connectionOptionsList = new ArrayList<>(5);
for (String clickHouseHost : clickHouseHosts) {
JdbcConnectionOptions build = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + clickHouseHost + ":" + clickHousePort + "/" + clickHouseDatabase)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(clickHouseUser)
.withPassword(clickHousePassword)
.build();
connectionOptionsList.add(build);
}
List<AbstractJdbcOutputFormat<T>> outputFormatList = new ArrayList<>(5);
for (JdbcConnectionOptions singleConnectionOption : connectionOptionsList) {
AbstractJdbcOutputFormat<T> format = new JdbcBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(singleConnectionOption),
executionOptions,
context -> {
Preconditions.checkState(
!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
return JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity());
},
JdbcBatchingOutputFormat.RecordExtractor.identity());
outputFormatList.add(format);
}
return new ClickHouseSinkFunction<T>(outputFormatList, shuntValue);
}
public static <T> ClickHouseSinkBuilder<T> builder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
return new ClickHouseSinkBuilder<T>(sql, statementBuilder, clickHouseHosts);
}
public ClickHouseSinkBuilder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
this.sql = sql;
this.statementBuilder = statementBuilder;
this.clickHouseHosts = clickHouseHosts;
}
public ClickHouseSinkBuilder<T> setExecutionOptions(JdbcExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseHosts(List<String> clickHouseHosts) {
this.clickHouseHosts = clickHouseHosts;
return this;
}
public ClickHouseSinkBuilder<T> setClickHousePort(String clickHousePort) {
this.clickHousePort = clickHousePort;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseUser(String clickHouseUser) {
this.clickHouseUser = clickHouseUser;
return this;
}
public ClickHouseSinkBuilder<T> setClickHousePassword(String clickHousePassword) {
this.clickHousePassword = clickHousePassword;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseDatabase(String clickHouseDatabase) {
this.clickHouseDatabase = clickHouseDatabase;
return this;
}
public ClickHouseSinkBuilder<T> setShuntValue(ShuntValue<T> shuntValue) {
this.shuntValue = shuntValue;
return this;
}
public SinkFunction<T> build() {
return buildAll(sql, statementBuilder, executionOptions, clickHouseHosts, clickHousePort, clickHouseUser, clickHousePassword, clickHouseDatabase, shuntValue);
}
}
使用
ArrayList<String> hosts = new ArrayList<String>(3);
hosts.add("host1");
hosts.add("host2");
SinkFunction<DwdOrderBean> sinkClusterClickHouse = ClickHouseSinkBuilder.builder(
"insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
hosts)
.setShuntValue(value -> value.getTenant_code().hashCode())
.setClickHouseHosts(hosts)
.setClickHousePassword("pas")
.setClickHousePort("222")
.setClickHouseUser("user")
.setClickHouseDatabase("default")
.setExecutionOptions(new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(5000L)
.withBatchSize(50000)
.withMaxRetries(3)
.build())
.build();
