java輕松實現無鎖隊列


1、什么是無鎖(Lock-Free)編程

       當談及 Lock-Free 編程時,我們常將其概念與 Mutex(互斥) 或 Lock(鎖) 聯系在一起,描述要在編程中盡量少使用這些鎖結構,降低線程間互相阻塞的機會,以提高應用程序的性能。類同的概念還有 "Lockless" 和 "Non-Blocking" 等。實際上,這樣的描述只涵蓋了 Lock-Free編程的一部分內容。本質上說,Lock-Free 編程僅描述了代碼所表述的性質,而沒有限定或要求代碼該如何編寫。

基本上,如果程序中的某一部分符合下面的條件判定描述,則我們稱這部分程序是符合 Lock-Free的。反過來說,如果某一部分程序不符合下面的條件描述,則稱這部分程序是不符合 Lock-Free 的。

       上面的英文翻譯成中文就是很簡單的:如果你的應用程序是多線程並且它們之間都有訪問共享內存但是訪問時並沒有相互阻塞,那它就是lock-free編程。注意lock-free只是強調了編程概念並沒指定其具體的實現形式,其強調的概念是「線程間訪問共享內存時不會相互阻塞」。那如果沒有lock或者Mutex就一定是lock-free編程了嗎,看下面的代碼片段:

     

        x = 0;

        while(x == 0){

             x = 1 - x;

        }

       假設有線程T1,T2同時調用這段代碼,T1,T2都判斷x == 0,進行到循環。T1先執行 x = 1 - 0,此時 x = 1后 T2 執行 x = 1 - 1。x = 0。T1,T2此時判斷x == 0,結果兩者又進入了循環。。。線程T1,T2相互影響,兩者都陷入了死循環,這種某種意義也算得上是相互阻塞使線程,所以這不算是lock-free編程。

 ok,了解了lock-free編程的相關概念那要怎么實現呢。在開始說無鎖隊列之前,我們需要知道一個很重要的技術就是CAS操作——Compare & Set,或是 Compare & Swap,現在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應的是 CMPXCHG 匯編指令。有了這個原子操作,我們就可以用其來實現各種無鎖(lock free)的數據結構。

這個操作用C語言來描述就是下面這個樣子:意思就是說,看一看內存*reg里的值是不是oldval,如果是的話,則對其賦值newval。

 
1
2
3
4
5
6
7
int compare_and_swap ( int * reg, int oldval, int newval)
{
   int old_reg_val = *reg;
   if (old_reg_val == oldval)
      *reg = newval;
   return old_reg_val;
}

用JAVA語言則是:

 public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

  

了解了CAS操作之后實現lock-free數據結構思路是怎樣呢?這里就有篇論文講述了思路:http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf其中里面就提到了如何用數組實現一個lock-free隊列。有興趣的朋友可以參考上面鏈接閱讀里面的第5章節。現在說一下我自己具體的實現思路:

  • 數組隊列是一個循環數組,隊列少用一個元素,當頭等於尾標示隊空,尾加1等於頭標示隊滿。
  • 數組的元素用EMPTY(無數據,標示可以入隊)和FULL(有數據,標示可以出隊)標記指示,數組一開始全部初始化成 EMPTY標示空隊列。
  • EnQue 操作:如果當前隊尾位置為EMPTY,標示線程可以在當前位置入隊,通過CAS原子操作把該位置設置為FULL,避免其它線程操作這個位置,操作完后修改隊尾位置。各個線程競爭新的隊尾位置。如下圖所示:

 


 

下面是貼上具體的代碼:

      

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;


/**
 * 用數組實現無鎖有界隊列
 */

public class LockFreeQueue {

    private AtomicReferenceArray atomicReferenceArray;
    //代表為空,沒有元素
    private static final  Integer EMPTY = null;
    //頭指針,尾指針
    AtomicInteger head,tail;


    public LockFreeQueue(int size){
        atomicReferenceArray = new AtomicReferenceArray(new Integer[size + 1]);
        head = new AtomicInteger(0);
        tail = new AtomicInteger(0);
    }

    /**
     * 入隊
     * @param element
     * @return
     */
    public boolean add(Integer element){
        int index = (tail.get() + 1) % atomicReferenceArray.length();
        if( index == head.get() % atomicReferenceArray.length()){
            System.out.println("當前隊列已滿,"+ element+"無法入隊!");
            return false;
        }
        while(!atomicReferenceArray.compareAndSet(index,EMPTY,element)){
            return add(element);
        }
        tail.incrementAndGet(); //移動尾指針
        System.out.println("入隊成功!" + element);
        return true;
    }

    /**
     * 出隊
     * @return
     */
    public Integer poll(){
        if(head.get() == tail.get()){
            System.out.println("當前隊列為空");
            return null;
        }
        int index = (head.get() + 1) % atomicReferenceArray.length();
        Integer ele = (Integer) atomicReferenceArray.get(index);
        if(ele == null){ //有可能其它線程也在出隊
            return poll();
        }
        while(!atomicReferenceArray.compareAndSet(index,ele,EMPTY)){
            return poll();
        }
        head.incrementAndGet();
        System.out.println("出隊成功!" + ele);
        return ele;
    }

    public void print(){
       StringBuffer buffer = new StringBuffer("[");
       for(int i = 0; i < atomicReferenceArray.length() ; i++){
           if(i == head.get() || atomicReferenceArray.get(i) == null){
               continue;
           }
           buffer.append(atomicReferenceArray.get(i) + ",");
       }
       buffer.deleteCharAt(buffer.length() - 1);
       buffer.append("]");
       System.out.println("隊列內容:"    +buffer.toString());

    }

}

  代碼很簡單,相應的注釋也寫上了,相信大家都應該看得懂~。

       這里說明一下JDK提供的CAS原子操作類都位於 java.util.concurrent.atomic下面。這里用到的是數組我用的是AtomicReferenceArray類,當然你也可以用AtomicIntegerArray。這里用到了兩個原子類的作為指針head,tail,利用mod隊列的長度來實現一個循環數組。

       下面測試我們的代碼: 

import java.util.stream.IntStream;

public class LockFreeDemo {
        public static void main(String[] args) {
            LockFreeQueue queue = new LockFreeQueue(5);
            IntStream.rangeClosed(1, 10).parallel().forEach(
                    i -> {
                        if (i % 2 == 0) {
                            queue.add(i);
                        } else {
                            queue.poll();
                        }
                    }
            );
            queue.print();
        }
}

       這里面用了JDK8的lambda並行流的特性,起了Ncpu線程去並發得入隊和出隊。運行結果如下:

入隊成功!2
當前隊列為空
當前隊列為空
入隊成功!6
當前隊列為空
入隊成功!8
出隊成功!2
入隊成功!10
出隊成功!6
入隊成功!4
隊列內容:[4,8,10]

       因為是並發打印,所以打出來的信息整體是無序的,但是對於同一個元素的操作,我們看到是相對有序的~  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM