實時日志數據寫入Clickhouse


一、背景

每天上百億的日志數據實時查詢是個挑戰,在架構設計上采用了Kafka + Flink + Clickhouse+Redash,實現海量數據的實時分析。計算層,我們開發了基於Flink計算引擎的實時數據平台,簡化開發流程,數據通過配置化實現動態Schema生成,底層數據解析統一,無需重復造輪子,整個數據鏈路,從數據的采集,轉換,存儲,可視化,無需寫一行代碼,配置化完成。本文主要介紹實時日志數據寫入Clickhouse的實踐。

Flink Clickhouse Sink

<dependency>
	<groupId>ru.yandex.clickhouse</groupId>
	<artifactId>clickhouse-jdbc</artifactId>
	<version>0.1.50</version>
</dependency>
public class ClickhouseSink extends RichSinkFunction<Row> implements Serializable {
    private String tablename;
    private String[] tableColums; 
    private List<String> types;   
    private String[] columns;    
    private String username;     
    private String password;     
    private String[] ips;   
    private String drivername = "ru.yandex.clickhouse.ClickHouseDriver";
    private List<Row> list = new ArrayList<>();
    private List<PreparedStatement> preparedStatementList = new ArrayList<>();
    private List<Connection> connectionList = new ArrayList<>();
    private List<Statement> statementList = new ArrayList<>();

    private long lastInsertTime = 0L;
    private long insertCkTimenterval = 4000L;
    // 插入的批次
    private int insertCkBatchSize = 10000;

    public ClickhouseSink(String tablename, String username, String password, String[] ips, String[] tableColums, List<String> types, String[] columns) {
        this.tablename = tablename;
        this.username = username;
        this.password = password;
        this.ips = ips;
        this.tableColums = tableColums;
        this.types = types;
        this.columns = columns;  // 新增字段
    }

    // 插入數據
    public void insertData(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException {

        for (int i = 0; i < rows.size(); ++i) {
            Row row = rows.get(i);
            for (int j = 0; j < this.tableColums.length; ++j) {
                if (null != row.getField(j)) {
                    preparedStatement.setObject(j + 1, row.getField(j));

                } else {
                    preparedStatement.setObject(j + 1, "null");
                }
            }
            preparedStatement.addBatch();
        }

        preparedStatement.executeBatch();
        connection.commit();
        preparedStatement.clearBatch();
    }


    /**
     * 新增字段修改表添加列
     *
     * @param statement
     * @throws Exception
     */
    public void tableAddColumn(Statement statement) {
        try {
            if (null != this.columns && this.columns.length > 0) {

                /**
                 * table 增加字段
                 */
                // 獲取原表字段名
                String querySql = "select * from " + this.tablename + " limit 1";

                ResultSet rs = statement.executeQuery(querySql);
                ResultSetMetaData rss = rs.getMetaData();
                int columnCount = rss.getColumnCount();

                List<String> orgTabCols = new ArrayList<>();
                for (int i = 1; i <= columnCount; ++i) {
                    orgTabCols.add(rss.getColumnName(i));
                }

                // 對比兩個數組,判斷新增字段是否在原來的表中
                Collection collection = new ArrayList<String>(orgTabCols);
                boolean exists = collection.removeAll(Arrays.asList(this.columns));

                // 新增字段不在原來的表中,執行添加列操作
                if (!exists) {

                    for (int i = 0; i < this.columns.length; ++i) {
                        String str = "";
                        String str_all = "";

                        StringBuilder sb = null;
                        StringBuilder sb_all = null;
                        if (i == 0) {
                            sb.append("alter table " ).append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(orgTabCols.get(orgTabCols.size() - 1));
                            sb_all.append("alter table " ).append("_all").append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(orgTabCols.get(orgTabCols.size() - 1));

                        } else {
                            sb.append("alter table " ).append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(this.columns[i - 1]);

                            sb_all.append("alter table " ).append("_all").append(this.tablename).append(" add column ").append(this.columns[i]).append(" String").append(" after ").append(this.columns[i - 1]);
                        }

                        if (StringUtils.isNotEmpty(sb.toString())) {
                            statement.executeUpdate(sb.toString());
                        }

                        if (StringUtils.isNotEmpty(sb_all.toString())) {
                            statement.executeUpdate(sb_all.toString());
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 根據IP創建連接
    public void createConnection() throws Exception {

        // 插入語句
        String insertStr = StrUtils.clickhouseInsertValue(this.tableColums, this.tablename);
        // 創建表
        List<String> createtableStrList = StrUtils.clickhouseCreatTable(this.tableColums, this.tablename, Constant.CKCLUSTERNAME, this.tableColums[3], this.types);
        // 創建數據庫
        String create_database_str = "create database if not exists " + this.tablename.split("\\.")[0];

        for (String ip : this.ips) {
            String url = "jdbc:clickhouse://" + ip + ":8123";
            Connection connection = DriverManager.getConnection(url, this.username, this.password);
            Statement statement = connection.createStatement();

            // 執行創建數據庫
            statement.executeUpdate(create_database_str);

            // 執行創建表
            statement.executeUpdate(createtableStrList.get(0));
            statement.executeUpdate(createtableStrList.get(1));

            // 增加表字段
            tableAddColumn(statement);

            this.statementList.add(statement);
            PreparedStatement preparedStatement = connection.prepareStatement(insertStr);
            connection.setAutoCommit(false);
            this.preparedStatementList.add(preparedStatement);
            this.connectionList.add(connection);
        }

    }


    @Override
    public void open(Configuration parameters) throws Exception {

        Class.forName(this.drivername);

        // 創建連接
        createConnection();
    }

    @Override
    public void invoke(Row row, Context context) throws Exception {
        
        // 輪詢寫入各個local表,避免單節點數據過多
        if (null != row) {
            Random random = new Random();
            int index = random.nextInt(this.ips.length);
            switch (index) {

                case 0:
                    if(list.size() >= this.insertCkBatchSize || isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(0),connectionList.get(0));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;
                case 1:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(1),connectionList.get(1));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;
                case 2:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(2),connectionList.get(2));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }
                    break;
                case 3:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(3),connectionList.get(3));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;
                case 4:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(4),connectionList.get(4));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;
                case 5:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(5),connectionList.get(5));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;

                case 6:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(6),connectionList.get(6));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }
                    break;
                case 7:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(7),connectionList.get(7));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }
                    break;

                case 8:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(8),connectionList.get(8));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }
                    break;
                case 9:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(9),connectionList.get(9));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }

                    break;
                case 10:
                    if(list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
                        insertData(list,preparedStatementList.get(10),connectionList.get(10));
                        list.clear();
                        this.lastInsertTime = System.currentTimeMillis();
                    } else {
                        list.add(row);
                    }
                    break;

            }
        }
    }

    @Override
    public void close() throws Exception {

        for (Statement statement : this.statementList) {
            if (null != statement) {
                statement.close();
            }
        }

        for (PreparedStatement preparedStatement : this.preparedStatementList) {
            if (null != preparedStatement) {
                preparedStatement.close();
            }
        }

        for (Connection connection : this.connectionList) {
            if (null != connection) {
                connection.close();
            }
        }
    }

    /**
     * 根據時間判斷是否插入數據
     *
     * @return
     */
    private boolean isTimeToDoInsert() {
        long currTime = System.currentTimeMillis();
        return currTime - this.lastInsertTime >= this.insertCkTimenterval;
    }
}

通過自定義Sink方式寫入Clickhouse,底層還是使用JDBC的方式,要注意插入不要過於頻繁,否則會報錯誤(數據插入的頻率大於數據合並)批次插入,批次最好設置大點,輪詢寫入每個節點方式有待優化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM