一、背景
每天上百億的日志數據實時查詢是個挑戰,在架構設計上采用了Kafka + Flink + Clickhouse+Redash,實現海量數據的實時分析。計算層,我們開發了基於Flink計算引擎的實時數據平台,簡化開發流程,數據通過配置化實現動態Schema生成,底層數據解析統一,無需重復造輪子,整個數據鏈路,從數據的采集,轉換,存儲,可視化,無需寫一行代碼,配置化完成。本文主要介紹實時日志數據寫入Clickhouse的實踐。
Flink Clickhouse Sink
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.1.50</version> </dependency>
public class ClickhouseSink extends RichSinkFunction<Row> implements Serializable { private String tablename; private String[] tableColums; private List<String> types; private String[] columns; private String username; private String password; private String[] ips; private String drivername = "ru.yandex.clickhouse.ClickHouseDriver"; private List<Row> list = new ArrayList<>(); private List<PreparedStatement> preparedStatementList = new ArrayList<>(); private List<Connection> connectionList = new ArrayList<>(); private List<Statement> statementList = new ArrayList<>(); private long lastInsertTime = 0L; private long insertCkTimenterval = 4000L; // 插入的批次 private int insertCkBatchSize = 10000; public ClickhouseSink(String tablename, String username, String password, String[] ips, String[] tableColums, List<String> types, String[] columns) { this.tablename = tablename; this.username = username; this.password = password; this.ips = ips; this.tableColums = tableColums; this.types = types; this.columns = columns; // 新增字段 } // 插入數據 public void insertData(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException { for (int i = 0; i < rows.size(); ++i) { Row row = rows.get(i); for (int j = 0; j < this.tableColums.length; ++j) { if (null != row.getField(j)) { preparedStatement.setObject(j + 1, row.getField(j)); } else { preparedStatement.setObject(j + 1, "null"); } } preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); } /** * 新增字段修改表添加列 * * @param statement * @throws Exception */ public void tableAddColumn(Statement statement) { try { if (null != this.columns && this.columns.length > 0) { /** * table 增加字段 */ // 獲取原表字段名 String querySql = "select * from " + this.tablename + " limit 1"; ResultSet rs = statement.executeQuery(querySql); ResultSetMetaData rss = rs.getMetaData(); int columnCount = rss.getColumnCount(); List<String> orgTabCols = new ArrayList<>(); for (int i = 1; i <= columnCount; ++i) { orgTabCols.add(rss.getColumnName(i)); } // 對比兩個數組,判斷新增字段是否在原來的表中 Collection collection = new ArrayList<String>(orgTabCols); boolean exists = collection.removeAll(Arrays.asList(this.columns)); // 新增字段不在原來的表中,執行添加列操作 if (!exists) { for (int i = 0; i < this.columns.length; ++i) { String str = ""; String str_all = ""; StringBuilder sb = null; StringBuilder sb_all = null; if (i == 0) { sb.append("alter table " ).append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(orgTabCols.get(orgTabCols.size() - 1)); sb_all.append("alter table " ).append("_all").append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(orgTabCols.get(orgTabCols.size() - 1)); } else { sb.append("alter table " ).append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(this.columns[i - 1]); sb_all.append("alter table " ).append("_all").append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(this.columns[i - 1]); } if (StringUtils.isNotEmpty(sb.toString())) { statement.executeUpdate(sb.toString()); } if (StringUtils.isNotEmpty(sb_all.toString())) { statement.executeUpdate(sb_all.toString()); } } } } } catch (Exception e) { e.printStackTrace(); } } // 根據IP創建連接 public void createConnection() throws Exception { // 插入語句 String insertStr = StrUtils.clickhouseInsertValue(this.tableColums, this.tablename); // 創建表 List<String> createtableStrList = StrUtils.clickhouseCreatTable(this.tableColums, this.tablename, Constant.CKCLUSTERNAME, this.tableColums[3], this.types); // 創建數據庫 String create_database_str = "create database if not exists " + this.tablename.split("\\.")[0]; for (String ip : this.ips) { String url = "jdbc:clickhouse://" + ip + ":8123"; Connection connection = DriverManager.getConnection(url, this.username, this.password); Statement statement = connection.createStatement(); // 執行創建數據庫 statement.executeUpdate(create_database_str); // 執行創建表 statement.executeUpdate(createtableStrList.get(0)); statement.executeUpdate(createtableStrList.get(1)); // 增加表字段 tableAddColumn(statement); this.statementList.add(statement); PreparedStatement preparedStatement = connection.prepareStatement(insertStr); connection.setAutoCommit(false); this.preparedStatementList.add(preparedStatement); this.connectionList.add(connection); } } @Override public void open(Configuration parameters) throws Exception { Class.forName(this.drivername); // 創建連接 createConnection(); } @Override public void invoke(Row row, Context context) throws Exception { // 輪詢寫入各個local表,避免單節點數據過多 if (null != row) { Random random = new Random(); int index = random.nextInt(this.ips.length); switch (index) { case 0: if(list.size() >= this.insertCkBatchSize || isTimeToDoInsert()) { insertData(list,preparedStatementList.get(0),connectionList.get(0)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 1: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(1),connectionList.get(1)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 2: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(2),connectionList.get(2)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 3: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(3),connectionList.get(3)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 4: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(4),connectionList.get(4)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 5: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(5),connectionList.get(5)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 6: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(6),connectionList.get(6)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 7: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(7),connectionList.get(7)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 8: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(8),connectionList.get(8)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 9: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(9),connectionList.get(9)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; case 10: if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) { insertData(list,preparedStatementList.get(10),connectionList.get(10)); list.clear(); this.lastInsertTime = System.currentTimeMillis(); } else { list.add(row); } break; } } } @Override public void close() throws Exception { for (Statement statement : this.statementList) { if (null != statement) { statement.close(); } } for (PreparedStatement preparedStatement : this.preparedStatementList) { if (null != preparedStatement) { preparedStatement.close(); } } for (Connection connection : this.connectionList) { if (null != connection) { connection.close(); } } } /** * 根據時間判斷是否插入數據 * * @return */ private boolean isTimeToDoInsert() { long currTime = System.currentTimeMillis(); return currTime - this.lastInsertTime >= this.insertCkTimenterval; } }
通過自定義Sink方式寫入Clickhouse,底層還是使用JDBC的方式,要注意插入不要過於頻繁,否則會報錯誤(數據插入的頻率大於數據合並)批次插入,批次最好設置大點,輪詢寫入每個節點方式有待優化。