【Zookeeper】源碼分析之請求處理鏈(三)之SyncRequestProcessor


一、前言

  在分析了PrepRequestProcessor處理器后,接着來分析SyncRequestProcessor,該處理器將請求存入磁盤,其將請求批量的存入磁盤以提高效率,請求在寫入磁盤之前是不會被轉發到下個處理器的。

二、SyncRequestProcessor源碼分析

  2.1 類的繼承關系   

public class SyncRequestProcessor extends Thread implements RequestProcessor {}

  說明:與PrepRequestProcessor一樣,SyncRequestProcessor也繼承了Thread類並實現了RequestProcessor接口,表示其可以作為線程使用。

  2.2 類的屬性  

public class SyncRequestProcessor extends Thread implements RequestProcessor {
    // 日志
    private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
    
    // Zookeeper服務器
    private final ZooKeeperServer zks;
    
    // 請求隊列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
        
    // 下個處理器
    private final RequestProcessor nextProcessor;
    
    // 快照處理線程
    private Thread snapInProcess = null;
    
    // 是否在運行中
    volatile private boolean running;

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    // 等待被刷新到磁盤的請求隊列
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    
    // 隨機數生成器
    private final Random r = new Random(System.nanoTime());
    /**
     * The number of log entries to log before starting a snapshot
     */
    // 快照個數
    private static int snapCount = ZooKeeperServer.getSnapCount();
    
    /**
     * The number of log entries before rolling the log, number
     * is chosen randomly
     */
    // 日志滾動之前記錄的日志號,編號是隨機選擇的
    private static int randRoll;

    // 結束請求標識
    private final Request requestOfDeath = Request.requestOfDeath;
}

  說明:其中,SyncRequestProcessor維護了ZooKeeperServer實例,其用於獲取ZooKeeper的數據庫和其他信息;維護了一個處理請求的隊列,其用於存放請求;維護了一個處理快照的線程,用於處理快照;維護了一個running標識,標識SyncRequestProcessor是否在運行;同時還維護了一個等待被刷新到磁盤的請求隊列。

  2.3 類的構造函數  

    public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor)
    {
        // 調用父類構造函數
        super("SyncThread:" + zks.getServerId());
        // 給字段賦值
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

  說明:構造函數首先會調用Thread類的構造函數,然后根據構造函數參數給類的屬性賦值,其中會確定下個處理器,並會設置該處理器正在運行的標識。

  2.4 類的核心函數分析

  1. run函數

    public void run() {
        try {
            // 寫日志數量初始化為0
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            // 確保所有的服務器在同一時間不是使用的同一個快照
            setRandRoll(r.nextInt(snapCount/2));
            while (true) { // 
                // 初始請求為null
                Request si = null;
                if (toFlush.isEmpty()) { // 沒有需要刷新到磁盤的請求
                    // 從請求隊列中取出一個請求,若隊列為空會阻塞
                    si = queuedRequests.take();
                } else { // 隊列不為空,即有需要刷新到磁盤的請求
                    // 從請求隊列中取出一個請求,若隊列為空,則返回空,不會阻塞
                    si = queuedRequests.poll();
                    if (si == null) { // 取出的請求為空
                        // 刷新到磁盤
                        flush(toFlush);
                        // 跳過后面的處理
                        continue;
                    }
                }
                if (si == requestOfDeath) { // 在關閉處理器之后,會添加requestOfDeath,表示關閉后不再處理請求
                    break;
                }
                if (si != null) { // 請求不為空
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) { // 將請求添加至日志文件,只有事務性請求才會返回true
                        // 寫入一條日志,logCount加1
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) { // 滿足roll the log的條件
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) { // 正在進行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else { // 未被處理
                                snapInProcess = new Thread("Snapshot Thread") { // 創建線程來處理快照
                                        public void run() {
                                            try {
                                                // 進行快照
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                // 開始處理
                                snapInProcess.start();
                            }
                            // 重置為0
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) { // 等待被刷新到磁盤的請求隊列為空
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        // 查看此時toFlush是否為空,如果為空,說明近段時間讀多寫少,直接響應
                        if (nextProcessor != null) { // 下個處理器不為空
                            // 下個處理器開始處理請求
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) { // 處理器是Flushable的
                                // 刷新到磁盤
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        // 跳過后續處理
                        continue;
                    }
                    // 將請求添加至被刷新至磁盤隊列
                    toFlush.add(si);
                    if (toFlush.size() > 1000) { // 隊列大小大於1000,直接刷新到磁盤
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) { // 出現異常
            LOG.error("Severe unrecoverable error, exiting", t);
            // 設置運行標識為false,表示該處理器不再運行
            running = false;
            // 退出程序
            System.exit(11);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

  說明:該函數是整個處理器的核心,其邏輯大致如下

  (1) 設置randRoll大小,確保所有的服務器在同一時間不是使用的同一個快照。

  (2) 判斷toFlush隊列是否為空,若是,則表示沒有需要刷新到磁盤的請求,進入(3),若否,進入(4)。

  (3) 從queuedRequests中取出一個請求,進入(6)。

  (4) 從queuedRequests中取出一個請求,判斷該請求是否為null,若是,則進入(5),若否,則進入(6)。

  (5) 調用flush函數,將toFlush中的請求刷新到磁盤,跳過之后的處理部分。

  (6) 判斷請求是否是結束請求(在調用shutdown之后,會在隊列中添加一個requestOfDeath)。若是,則退出,否則,進入(7)。

  (7) 判斷請求是否為null,若否,則進入(8),否則進入(2)。

  (8) 若寫入日志成功,返回true(表示為事務性請求),進入(9),否則進入(18)。

  (9) logCount加1,並判斷是否大於了閾值,若是,則進入(10),否則進入(18)。

  (10) 調用rollLog函數翻轉日志文件。

  (11) 判斷snapInProcess是否為空並且是否存活,若是,則輸出日志,否則,進入(12)。

  (12) 創建snapInProcess線程並啟動。

  (13) 重置logCount為0。

  (14) 判斷toFlush隊列是否為空,若是,進入(15)。

  (15) 判斷nextProcessor是否為空,若否,則使用nextProcessor處理請求,否則進入(16)。

  (16) 判斷nextProcessor是否是Flushable的,若是,則調用flush函數刷新請求至磁盤,否則進入(17)

  (17) 跳過之后的處理步驟。

  (18) 將請求添加至toFlush隊列。

  (19) 若toFlush隊列大小大於1000,則刷新至磁盤,進入(2)。

  其中會調用flush函數,其源碼如下 

    // 刷新到磁盤
    private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty()) // 隊列為空,返回
            return;

        // 提交至ZK數據庫
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) { // 隊列不為空
            // 從隊列移除請求
            Request i = toFlush.remove();
            if (nextProcessor != null) { // 下個處理器不為空
                // 下個處理器開始處理請求
                nextProcessor.processRequest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) { // 下個處理器不為空並且是Flushable的
            // 刷新到磁盤
            ((Flushable)nextProcessor).flush();
        }
    }

  說明:該函數主要用於將toFlush隊列中的請求刷新到磁盤中。

  2. shutdown函數 

    public void shutdown() {
        LOG.info("Shutting down");
        // 添加結束請求請求至隊列
        queuedRequests.add(requestOfDeath);
        try {
            if(running){ // 還在運行
                // 等待該線程終止
                this.join();
            }
            if (!toFlush.isEmpty()) { // 隊列不為空
                // 刷新到磁盤
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

  說明:該函數用於關閉SyncRequestProcessor處理器,其首先會在queuedRequests隊列中添加一個結束請求,然后再判斷SyncRequestProcessor是否還在運行,若是,則會等待其結束;之后判斷toFlush隊列是否為空,若不為空,則刷新到磁盤中。

三、總結

  本篇講解了SyncRequestProcessor的工作原理,其主要作用包含將事務性請求刷新到磁盤,並且對請求進行快照處理。也謝謝各位園友的觀看~

 

參考鏈接:http://blog.csdn.net/pwlazy/article/details/8137121  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM