Storm常見模式——批處理


Storm對流數據進行實時處理時,一種常見場景是批量一起處理一定數量的tuple元組,而不是每接收一個tuple就立刻處理一個tuple,這樣可能是性能的考慮,或者是具體業務的需要。

例如,批量查詢或者更新數據庫,如果每一條tuple生成一條sql執行一次數據庫操作,數據量大的時候,效率會比批量處理的低很多,影響系統吞吐量。

當然,如果要使用Storm的可靠數據處理機制的話,應該使用容器將這些tuple的引用緩存到內存中,直到批量處理的時候,ack這些tuple。

下面給出一個簡單的代碼示例:

現在,假設我們已經有了一個DBManager數據庫操作接口類,它至少有兩個接口:

(1)getConnection(): 返回一個java.sql.Connection對象;

(2)getSQL(Tuple tuple): 根據tuple元組生成數據庫操作語句。

為了在Bolt中緩存一定數量的tuple,構造Bolt時傳遞int n參數賦給Bolt的成員變量int count,指定每個n條tuple批量處理一次。

同時,為了在內存中緩存緩存Tuple,使用java concurrent中的ConcurrentLinkedQueue來存儲tuple,每當攢夠count條tuple,就觸發批量處理。

另外,考慮到數據量小(如很長時間內都沒有攢夠count條tuple)或者count條數設置過大時,因此,Bolt中加入了一個定時器,保證最多每個1秒鍾進行一次批量處理tuple。

下面是Bolt的完整代碼(僅供參考):

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class BatchingBolt implements IRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>();
    private int count;
    private long lastTime;
    private Connection conn;

    public BatchingBolt(int n) {
        count = n; //批量處理的Tuple記錄條數
        conn = DBManger.getConnection(); //通過DBManager獲取數據庫連接
        lastTime = System.currentTimeMillis(); //上次批量處理的時間戳
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        tupleQueue.add(tuple);
        long currentTime = System.currentTimeMillis();
        // 每count條tuple批量提交一次,或者每個1秒鍾提交一次
        if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) {
            Statement stmt = conn.createStatement();
            conn.setAutoCommit(false);
            for (int i = 0; i < count; i++) {
                Tuple tup = (Tuple) tupleQueue.poll();
                String sql = DBManager.getSQL(tup); //生成sql語句
                stmt.addBatch(sql); //加入sql
                collector.ack(tup); //進行ack
            }
            stmt.executeBatch(); //批量提交sql
            conn.commit();
            conn.setAutoCommit(true);
            System.out.println("batch insert data into database, total records: " + count);
            lastTime = currentTime;
        }
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM