負載均衡算法之輪詢


最近的工作事情比較少,於是就開是瞎折騰了

負載均衡

負載均衡大家一定不陌生了,一句話就是,人人有飯吃,還吃得飽,它的核心關鍵字就在於均衡,關於負載均衡大家基本可以脫口而出常見的幾種,輪詢,隨機,哈希,帶權值的輪詢,客戶端請求數等等

輪詢

作為最簡單的一種負載均衡策略,輪詢的優點顯而易見,簡單,並且在多數的情況是,基本適用(一般部署的線上集群機器,大部分的配置都比較相近,差距不會那么大,因此使用輪詢是一種可以接受的方案)

實現

輪詢的實現簡單來說就是從一個“循環列表”中不斷的獲取,這里的列表可以是數組,也可以是鏈表,也可以是map的key集合,簡而言之,就是一維數組類型。

這里我簡單的做了三種輪詢的實現,分別是基於 Atomic包的實現,synchronized同步,以及blockingQueue

Atomic

atomic包內的類是基於cas來實現值的同步,因此可以利用這一點來做輪詢,測試代碼如下

	private List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5, 6);
	
    @Test
    public void test() {
        AtomicInteger atomicInteger = new AtomicInteger(0);

		// 線程數,來模擬並發的激烈程度
        int threadNum = 4;
        int total = 10_0000;

        long now = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        CountDownLatch latch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(new Task(latch, atomicInteger, total));
        }
        try {
            latch.await(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("costs = " + (System.currentTimeMillis() - now));
    }
	
	
    private class Task implements Runnable {

        private CountDownLatch latch;
        AtomicInteger atomicInteger;
        private int total;

        Task(CountDownLatch latch, AtomicInteger atomicInteger, int total) {
            this.latch = latch;
            this.atomicInteger = atomicInteger;
            this.total = total;
        }

        @Override
        public void run() {
            long tid = Thread.currentThread().getId();
            try {
                for (int i = 0; i < this.total; i++) {
                    int idx = atomicInteger.getAndIncrement() % 6;
                    idx = list.get(idx < 0 ? -idx : idx);
//                    System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
                }
            } finally {
                this.latch.countDown();
            }
        }
    }

synchronized

同步是我們最容易想到的方式了


	private LinkedList<Integer> linkedList  = new LinkedList<>();
    private final Object MUTEX = new Object();
    
    @Test
    public void testSync(){
        linkedList.add(1);
        linkedList.add(2);
        linkedList.add(3);
        linkedList.add(4);
        linkedList.add(5);
        linkedList.add(6);

        long now = System.currentTimeMillis();
        int threadNum = 64;
        int total = 10_0000;

        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        CountDownLatch latch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(new TaskSync(latch, total));
        }
        try {
            latch.await(120L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("costs = " + (System.currentTimeMillis() - now));

    }
    
    private class TaskSync implements Runnable{
        private CountDownLatch latch;

        private int total;

        TaskSync(CountDownLatch latch, int total) {
            this.latch = latch;
            this.total = total;
        }

        public void run() {
            long tid = Thread.currentThread().getId();
            try {
                for (int i = 0; i < this.total; i++) {
                    synchronized (MUTEX){
                        // 從頭取出,並放回到隊尾
                        Integer idx = linkedList.removeFirst();
//                        System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
                        linkedList.add(idx);
                    }
                }
            } finally {
                this.latch.countDown();
            }
        }
    }

阻塞隊列

concurrent包中有很多為我們封裝了底層細節的包,可以直接進行使用,其中就包含了阻塞隊列,阻塞隊列許多的操作都是線程安全的。


private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(16);


@Test
public void testBlocking() throws InterruptedException {
    queue.add(1);
    queue.add(2);
    queue.add(3);
    queue.add(4);
    queue.add(5);
    queue.add(6);


	long now = System.currentTimeMillis();

	int threadNum = 8;
	int total = 1000_0000;

	ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
	CountDownLatch latch = new CountDownLatch(threadNum);
	for (int i = 0; i < threadNum; i++) {
		executorService.submit(new TaskBlocking(latch, total));
	}
	try {
		latch.await(120L, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	System.out.println("costs = " + (System.currentTimeMillis() - now));
}


private class TaskBlocking implements Runnable {
	private CountDownLatch latch;

	private int total;

    TaskBlocking(CountDownLatch latch, int total) {
    	this.latch = latch;
    	this.total = total;
    }

    @Override
    public void run() {
        long tid = Thread.currentThread().getId();
        try {
            for (int i = 0; i < this.total; i++) {
                Integer idx = queue.poll();
                // poll可能會得到null,因此如果得到null,那么本次不算,重新獲取
                if (idx == null) {
                    i--;
                    continue;
                }
                //System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
                queue.add(idx);
            }
        } finally {
            this.latch.countDown();
        }
    }
}

測試結果

本人機器,win10系統,CPU4核

atomic

並發數 循環次數(萬次) 耗時(s)
4 10 0.08
8 10 0.12
16 10 0.135
32 10 0.16
4 1000 1.1
8 1000 2.2
16 1000 4.4
32 1000 9.5

synchronized

並發數 循環次數(萬次) 耗時(s)
4 10 0.203
8 10 0.243
16 10 0.339
32 10 0.996
4 1000 3.7
8 1000 7.1
16 1000 14.4
32 1000 26.4

blocking

並發數 循環次數(萬次) 耗時(s)
4 10 0.138
8 10 0.2
16 10 0.381
32 10 0.769
4 1000 4
8 1000 7.9
16 1000 20.3
32 1000 74.8

結果比較

從耗時的結果上來看,atomic是最快的一種實現,blocking最慢(blocking的取和存,在源代碼中都有使用到ReentrantLock,因此一次Run()需要2次鎖的獲取),而synchronized會比阻塞隊列的方式稍微好點

好了,以上是我對輪詢的一點小探索,如果您覺得有哪里不正確或有其他建議的地方,歡迎拍磚


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM