CountDownLatch和CyclicBarrier模擬同時並發請求


  有時候要測試一下某個功能的並發能力,又不要想借助於其他測試工具,索性就自己寫簡單的demo模擬一個並發請求就最方便了。如果熟悉jemter的測試某接口的並發能力其實更專業,此處只是自己折騰着玩。

CountDownLatch和CyclicBarrier是jdk concurrent包下非常有用的兩個並發工具類,它們提供了一種控制並發流程的手段。其實查看源碼它們都是在內部維護了一個計數器控制流程的

  • CountDownLatch:一個或者多個線程,等待其他多個線程完成某件事情之后才能執行;
  • CyclicBarrier:多個線程互相等待,直到到達同一個同步點,再繼續一起執行。   

CountDownLatch和CyclicBarrier的區別

  • CountDownLatch的計數器,線程完成一個記錄一個,計數器是遞減  計數器,只能使用一次
  • CyclicBarrier的計數器 更像是一個閥門,需要所有線程都到達,閥門才能打開,然后繼續執行,計數器是遞增  計數器提供reset功能,可以多次使用

   另外Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。

 

  通常我們模擬並發請求,一般都是多開幾個線程,發起請求就好了。但是方式,一般會存在啟動的先后順序了,算不得真正的同時並發!怎么樣才能做到真正的同時並發呢?是本文想說的點,java中提供了閉鎖 CountDownLatch, CyclicBarrier 剛好就用來做這種事就最合適了。

  下面分別使用CountDownLatch和CyclicBarrier來模擬並發的請求

CountDownLatch模擬

package com.test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.CountDownLatch;

public class LatchTest {

    public static void main(String[] args) throws InterruptedException {
        Runnable taskTemp = new Runnable() {

       // 注意,此處是非線程安全的,留坑
            private int iCounter;

            @Override
            public void run() {
                for(int i = 0; i < 10; i++) {
                    // 發起請求
//                    HttpClientOp.doGet("https://www.baidu.com/");
                    iCounter++;
                    System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + iCounter);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        LatchTest latchTest = new LatchTest();
        latchTest.startTaskAllInOnce(5, taskTemp);
    }

    public long startTaskAllInOnce(int threadNums, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(threadNums);
        for(int i = 0; i < threadNums; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        // 使線程在此等待,當開始門打開時,一起涌入門中
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            // 將結束門減1,減到0時,就可以開啟結束門了
                            endGate.countDown();
                        }
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
                }
            };
            t.start();
        }
        long startTime = System.nanoTime();
        System.out.println(startTime + " [" + Thread.currentThread() + "] All thread is ready, concurrent going...");
        // 因開啟門只需一個開關,所以立馬就開啟開始門
        startGate.countDown();
        // 等等結束門開啟
        endGate.await();
        long endTime = System.nanoTime();
        System.out.println(endTime + " [" + Thread.currentThread() + "] All thread is completed.");
        return endTime - startTime;
    }
}

執行結果

 

 CyclicBarrier模擬

// 與 閉鎖 結構一致
public class LatchTest {

    public static void main(String[] args) throws InterruptedException {
    
        Runnable taskTemp = new Runnable() {

            private int iCounter;

            @Override
            public void run() {
                // 發起請求
//              HttpClientOp.doGet("https://www.baidu.com/");
                iCounter++;
                System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + iCounter);
            }
        };

        LatchTest latchTest = new LatchTest();
//        latchTest.startTaskAllInOnce(5, taskTemp);
        latchTest.startNThreadsByBarrier(5, taskTemp);
    }

    public void startNThreadsByBarrier(int threadNums, Runnable finishTask) throws InterruptedException {
        // 設置柵欄解除時的動作,比如初始化某些值
        CyclicBarrier barrier = new CyclicBarrier(threadNums, finishTask);
        // 啟動 n 個線程,與柵欄閥值一致,即當線程准備數達到要求時,柵欄剛好開啟,從而達到統一控制效果
        for (int i = 0; i < threadNums; i++) {
            Thread.sleep(100);
            new Thread(new CounterTask(barrier)).start();
        }
        System.out.println(Thread.currentThread().getName() + " out over...");
    }
}


class CounterTask implements Runnable {

    // 傳入柵欄,一般考慮更優雅方式
    private CyclicBarrier barrier;

    public CounterTask(final CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " is ready...");
        try {
            // 設置柵欄,使在此等待,到達位置的線程達到要求即可開啟大門
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " started...");
    }
}

執行結果

 

並發請求操作流程示意圖如下:

 

  此處設置了一道門,以保證所有線程可以同時生效。但是,此處的同時啟動,也只是語言層面的東西,也並非絕對的同時並發。具體的調用還要依賴於CPU個數,線程數及操作系統的線程調度功能等,不過咱們也無需糾結於這些了,重點在於理解原理!

  畢竟測試並發 還得用專業的工具 jmeter 還是很方便的.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM