最近的工作事情比較少,於是就開是瞎折騰了
負載均衡
負載均衡大家一定不陌生了,一句話就是,人人有飯吃,還吃得飽,它的核心關鍵字就在於均衡,關於負載均衡大家基本可以脫口而出常見的幾種,輪詢,隨機,哈希,帶權值的輪詢,客戶端請求數
等等
輪詢
作為最簡單的一種負載均衡策略,輪詢的優點顯而易見,簡單,並且在多數的情況是,基本適用(一般部署的線上集群機器,大部分的配置都比較相近,差距不會那么大,因此使用輪詢是一種可以接受的方案)
實現
輪詢的實現簡單來說就是從一個“循環列表”中不斷的獲取,這里的列表可以是數組,也可以是鏈表,也可以是map的key集合,簡而言之,就是一維數組類型。
這里我簡單的做了三種輪詢的實現,分別是基於 Atomic包的實現,synchronized同步,以及blockingQueue
Atomic
atomic包內的類是基於cas來實現值的同步,因此可以利用這一點來做輪詢,測試代碼如下
private List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5, 6);
@Test
public void test() {
AtomicInteger atomicInteger = new AtomicInteger(0);
// 線程數,來模擬並發的激烈程度
int threadNum = 4;
int total = 10_0000;
long now = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(new Task(latch, atomicInteger, total));
}
try {
latch.await(60L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("costs = " + (System.currentTimeMillis() - now));
}
private class Task implements Runnable {
private CountDownLatch latch;
AtomicInteger atomicInteger;
private int total;
Task(CountDownLatch latch, AtomicInteger atomicInteger, int total) {
this.latch = latch;
this.atomicInteger = atomicInteger;
this.total = total;
}
@Override
public void run() {
long tid = Thread.currentThread().getId();
try {
for (int i = 0; i < this.total; i++) {
int idx = atomicInteger.getAndIncrement() % 6;
idx = list.get(idx < 0 ? -idx : idx);
// System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
}
} finally {
this.latch.countDown();
}
}
}
synchronized
同步是我們最容易想到的方式了
private LinkedList<Integer> linkedList = new LinkedList<>();
private final Object MUTEX = new Object();
@Test
public void testSync(){
linkedList.add(1);
linkedList.add(2);
linkedList.add(3);
linkedList.add(4);
linkedList.add(5);
linkedList.add(6);
long now = System.currentTimeMillis();
int threadNum = 64;
int total = 10_0000;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(new TaskSync(latch, total));
}
try {
latch.await(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("costs = " + (System.currentTimeMillis() - now));
}
private class TaskSync implements Runnable{
private CountDownLatch latch;
private int total;
TaskSync(CountDownLatch latch, int total) {
this.latch = latch;
this.total = total;
}
public void run() {
long tid = Thread.currentThread().getId();
try {
for (int i = 0; i < this.total; i++) {
synchronized (MUTEX){
// 從頭取出,並放回到隊尾
Integer idx = linkedList.removeFirst();
// System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
linkedList.add(idx);
}
}
} finally {
this.latch.countDown();
}
}
}
阻塞隊列
concurrent包中有很多為我們封裝了底層細節的包,可以直接進行使用,其中就包含了阻塞隊列,阻塞隊列許多的操作都是線程安全的。
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(16);
@Test
public void testBlocking() throws InterruptedException {
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
queue.add(6);
long now = System.currentTimeMillis();
int threadNum = 8;
int total = 1000_0000;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(new TaskBlocking(latch, total));
}
try {
latch.await(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("costs = " + (System.currentTimeMillis() - now));
}
private class TaskBlocking implements Runnable {
private CountDownLatch latch;
private int total;
TaskBlocking(CountDownLatch latch, int total) {
this.latch = latch;
this.total = total;
}
@Override
public void run() {
long tid = Thread.currentThread().getId();
try {
for (int i = 0; i < this.total; i++) {
Integer idx = queue.poll();
// poll可能會得到null,因此如果得到null,那么本次不算,重新獲取
if (idx == null) {
i--;
continue;
}
//System.out.printf("【Thread - %d】 get=%d\n", tid, idx);
queue.add(idx);
}
} finally {
this.latch.countDown();
}
}
}
測試結果
本人機器,win10系統,CPU4核
atomic
並發數 | 循環次數(萬次) | 耗時(s) |
---|---|---|
4 | 10 | 0.08 |
8 | 10 | 0.12 |
16 | 10 | 0.135 |
32 | 10 | 0.16 |
4 | 1000 | 1.1 |
8 | 1000 | 2.2 |
16 | 1000 | 4.4 |
32 | 1000 | 9.5 |
synchronized
並發數 | 循環次數(萬次) | 耗時(s) |
---|---|---|
4 | 10 | 0.203 |
8 | 10 | 0.243 |
16 | 10 | 0.339 |
32 | 10 | 0.996 |
4 | 1000 | 3.7 |
8 | 1000 | 7.1 |
16 | 1000 | 14.4 |
32 | 1000 | 26.4 |
blocking
並發數 | 循環次數(萬次) | 耗時(s) |
---|---|---|
4 | 10 | 0.138 |
8 | 10 | 0.2 |
16 | 10 | 0.381 |
32 | 10 | 0.769 |
4 | 1000 | 4 |
8 | 1000 | 7.9 |
16 | 1000 | 20.3 |
32 | 1000 | 74.8 |
結果比較
從耗時的結果上來看,atomic是最快的一種實現,blocking最慢(blocking的取和存,在源代碼中都有使用到ReentrantLock,因此一次Run()需要2次鎖的獲取),而synchronized會比阻塞隊列的方式稍微好點
好了,以上是我對輪詢的一點小探索,如果您覺得有哪里不正確或有其他建議的地方,歡迎拍磚