Clojure STM 筆記-下篇


   繼續完成"Software Transactional Memory"筆記的下篇,這部分內容基本上就是Clojure STM源碼閱讀指南,從事務實現的各種概念作為切入點,逐步跟進抽絲剝繭.在本文梳理即將結束的時候我找到"Software Transactional Memory"一文對應的Slide,看這個脈絡更清晰一些,早點找到這個Slide會省一些力氣.

 

源碼閱讀指南


Clojure STM 實現基本上都是Java代碼實現的,依賴Java處理並發的基礎設施:


Clojure STM的主要實現是在clojure.lang包的LockingTransaction類和 Ref類,路徑:


  下面我們就從概念切入代碼,把一個個概念落到實處,一圖勝千言,閱讀過程中可以不斷回顧下面這張UML圖.

 

創建事務

    創建一個Clojure的事務很簡單就把一系列表達式放在dosync宏的內部執行即可.這比顯示用鎖可簡單太多了.雖然這種方式不用顯示指明哪些Ref可能在事務中改變,但開發者還是要好好考慮哪些邏輯要放在dosync里面執行,哪些Ref要保持一致的快照.

(def account1 (ref 100))
(def account2 (ref 0))

(defn transfer [amount from to]
(dosync
(alter from - amount) ; alter from => (- @from amount)
(alter to + amount))) ; alter to => (+ @to amount)


    dosync 將多個表達式構成的事務體(transaction body)傳遞給sync宏,在sync調用LockingTransaction的靜態方法runInTransaction. sync將事務體封裝在一個匿名方法傳遞給runInTransaction.每個線程都有一個LockingTransaction類型的ThreadLocal變量,如果LockingTransaction對象尚未創建就先創建.如果當前線程已經在事務中,匿名方法直接會在事務中執行,否則就會啟動一個新的事務再執行事務體.

  看代碼:

; dosync & sync 

(defmacro dosync
"Runs the exprs (in an implicit do) in a transaction that encompasses
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of dosync. The exprs may be run more than
once, but any effects on Refs will be atomic."
{:added "1.0"}
[& exprs]
`(sync nil ~@exprs))


(defmacro sync
"transaction-flags => TBD, pass nil for now
Runs the exprs (in an implicit do) in a transaction that encompasses
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of sync. The exprs may be run more than
once, but any effects on Refs will be atomic."
{:added "1.0"}
[flags-ignored-for-now & body]
`(. clojure.lang.LockingTransaction
(runInTransaction (fn [] ~@body))))

 //https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/LockingTransaction.java

final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();

static public Object runInTransaction(Callable fn) throws Exception{
LockingTransaction t = transaction.get();
if(t == null)
transaction.set(t = new LockingTransaction());

if(t.info != null)
return fn.call();

return t.run(fn);  
}

 

 

事務狀態



事務狀態有下面五種:

* RUNNING
* COMMITTING
* RETRY
* KILLED
* COMMITTED

  RETRY和KILLED要特別說明一下:

RETRY狀態是指:事務開始嘗試一次RETRY但是還沒有進入Try的一個中間狀態.Try開始之后狀態是RUNNING.

private Object blockAndBail(Info refinfo){
//stop prior to blocking
stop(RETRY);
try
{
refinfo.latch.await(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS);
}
catch(InterruptedException e)
{
//ignore
}
throw retryex;
}


KILLED 有兩種情況會導致狀態被設置成KILLED,

1. 事務調用abort方法(目前的Clojure版本還沒有代碼調用abort).
2. 第二種情況就是搶先(barge)提交的時候


導致狀態變為KILLED的兩種情況,代碼片段:

void abort() throws AbortException{
 stop(KILLED);
 throw new AbortException();
}


private boolean barge(Info refinfo){
boolean barged = false;
//if this transaction is older
// try to abort the other
if(bargeTimeElapsed() &&∓ startPoint < refinfo.startPoint)
{
 barged = refinfo.status.compareAndSet(RUNNING, KILLED);
if(barged)
 refinfo.latch.countDown();
}
return barged;
}

 

 

Committed Values



每個Ref object 都在其Tval字段維護了一串已經提交的值(變更歷史,committed value chain). committed value chain的長度由字段minHistory(默認值0)和maxHistory(默認值10)決定,這個可以在創建Ref的時候自定義新值,或者后續使用ref-min-history和ref-max-history修改.看后面Faults的部分你就能了解這個長度控制的重要性了.

; pre-allocate history and limit max-history
;設置最大歷史長度,如果長度已經大於ref-max-history值 並不執行收縮.
(def myref (ref 1 :min-history 5 :max-history: 10))

; supply a function to validate the ref before commit.
(def myvalidref (ref 1 :validator pos?))

 

 

Locks



     每一個Ref對象都有一個ReentrantReadWriteLock,多個並發事務可以獲取其讀鎖,只有一個事務可以獲取其寫鎖.有ensure的時候,會在事務執行全過程持有鎖,ensure會一直持有讀鎖直到事務提交或者Ref在事務進行了修改(Ref上調用了ref-set或者alter,由於讀鎖無法直接升級為寫鎖,所以會先釋放然后再獲取寫鎖).沒有什么情況是需要在整個事務過程中都要持有寫鎖的,寫鎖只是在需要的時候獲取用完快速釋放掉;在事務提交的時候會再次獲取寫鎖提交完成后釋放.ReentrantReadWriteLock的細節可以在Ref類的lock字段使用看到.

final HashSet<Ref> ensures = new HashSet<Ref>();   //all hold readLock

void doEnsure(Ref ref){
     if(!info.running())
          throw retryex;
     if(ensures.contains(ref))
          return;
     ref.lock.readLock().lock();

     //someone completed a write after our snapshot
     if(ref.tvals != null && ref.tvals.point > readPoint) {
        ref.lock.readLock().unlock();
        throw retryex;
    }

     Info refinfo = ref.tinfo;

     //writer exists
     if(refinfo != null && refinfo.running())
          {
          ref.lock.readLock().unlock();

          if(refinfo != info) //not us, ensure is doomed
               {
               blockAndBail(refinfo);
               }
          }
     else
          ensures.add(ref);
}


     在Ref上調用ref-set或者alter會產生in-transaction值,這個值在提交前不會被其它事務看到.這些調用還會觸發tinfo字段被賦值; Ref中的LockingTransaction.Info tinfo;這個字段用來描述修改了Ref值的事務的相關信息,包括事務啟動的相對順序以及當前的狀態.借助這個字段當前事務知道還有哪些事務修改了Ref還沒有提交.這個字段的使用可以參看LockingTransaction類的lock方法.

 

//下面是tinfo的結構

https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/LockingTransaction.java

public static class Info{
     final AtomicInteger status;
     final long startPoint;
     final CountDownLatch latch;


     public Info(int status, long startPoint){
          this.status = new AtomicInteger(status);
          this.startPoint = startPoint;
          this.latch = new CountDownLatch(1);
     }

     public boolean running(){
          int s = status.get();
          return s == RUNNING || s == COMMITTING;
     }
}


//returns the most recent val
Object lock(Ref ref){
     //can't upgrade readLock, so release it
     releaseIfEnsured(ref);

     boolean unlocked = true;
     try
          {
          tryWriteLock(ref);
          unlocked = false;

          if(ref.tvals != null && ref.tvals.point > readPoint)
               throw retryex;
          Info refinfo = ref.tinfo;

          //write lock conflict
          if(refinfo != null && refinfo != info && refinfo.running())
               {
               if(!barge(refinfo))
                    {
                    ref.lock.writeLock().unlock();
                    unlocked = true;
                    return blockAndBail(refinfo);
                    }
               }
          ref.tinfo = info;
          return ref.tvals == null ? null : ref.tvals.val;
          }
     finally
          {
          if(!unlocked)
               ref.lock.writeLock().unlock();
          }
}

 

 

In-transaction Values



    每個LockingTransaction對象都在vals:HashMap<Ref, Object>字段維護了事務中修改過的Ref的in-transaction值.如果Ref是在事務中只是讀操作,那么它的值總是從tvals的committed value chain中獲取,沿着committed value chain找到當前事務啟動前最新的值.如果事務進行多次Ref的deref操作那么,這個尋找合適值的過程就要執行多次,但這種開銷除非是Ref值真的改動了才是必要的.Ref值第一次在事務中修改其新值就會放在vals Map中,在后續的事務過程中,值就只需要在vals map中獲取即可.

 

 

Faults



    下面的情況會導致"Fault"出現:在事務內部讀取Ref的值,但是Ref並沒有in-transaction值而且當前事務開始之后Comitted value chain才有值.當前事務Try開始之后別的事務才提交行為就可能導致這種情況.這段邏輯看下面的代碼:

Object doGet(Ref ref){
     if(!info.running())
          throw retryex;
     if(vals.containsKey(ref))
          return vals.get(ref);
     try
          {
          ref.lock.readLock().lock();
          if(ref.tvals == null)
               throw new IllegalStateException(ref.toString() + " is unbound.");
          Ref.TVal ver = ref.tvals;
          do
               {
               if(ver.point <= readPoint)
                    return ver.val;
               } while((ver = ver.prior) != ref.tvals);
          }
     finally
          {
          ref.lock.readLock().unlock();
          }
     //no version of val precedes the read point
     ref.faults.incrementAndGet();
     throw retryex;

}

 

   Fault出現,事務就會進行retry.如果事務提交變動的Ref在別的事務中已經經歷了一次Fault而且Committed Value Chain的長度小於maxHsitory,就會在Ref的Committed添加一個值節點.chain長度小於minhistory的時候也會觸發值節點添加. 度量chain的長度減小了在這個Ref上再出現fault的可能性.如果Ref上沒有發生fault,就不是添加一個新的node,chain的最后一個node變成第一個"最新提交的值".

每一個Ref的history chain長度可以不同,Chain長度隨着fault的發生自動適應.如果一個Ref上從不出現fault那么它的minhistory值是0(默認值) chain的長度不會超過1. Chain從來不會收縮,falut會導致chain變長,它要么保持長度要么隨着fault出現增長.

   //at this point, all values calced, all refs to be written locked
                    //no more client code to be called
                    long msecs = System.currentTimeMillis();
                    long commitPoint = getCommitPoint();
                    for(Map.Entry<Ref, Object> e : vals.entrySet())
                         {
                         Ref ref = e.getKey();
                         Object oldval = ref.tvals == null ? null : ref.tvals.val;
                         Object newval = e.getValue();
                         int hcount = ref.histCount();

                         if(ref.tvals == null)
                              {
                              ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
                              }
                         else if((ref.faults.get() > 0 && hcount < ref.maxHistory)
                                   || hcount < ref.minHistory)
                              {
                              ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
                              ref.faults.set(0);
                              }
                         else
                              {
                              ref.tvals = ref.tvals.next;
                              ref.tvals.val = newval;
                              ref.tvals.point = commitPoint;
                              ref.tvals.msecs = msecs;
                              }
                         if(ref.getWatches().count() > 0)
                              notify.add(new Notify(ref, oldval, newval));
                         }



Barging



事務B正在做重試,判斷事務A是否可以繼續繼續執行的決策過程稱為"搶先"(barge).一個事務想搶先於另一個事務需要滿足三個條件:

* 事務A 必須運行了至少1/100秒 (BARGE_WAIT_NANOS)
* 事務A 比事務B啟動要早 (偏愛older事務)
* 事務B 狀態是Running可以轉換到KILLED的狀態,不會打斷正在提交的事務

如果不滿足上述條件,事務A重試,事務B繼續執行.

public static final long BARGE_WAIT_NANOS = 10 * 1000000;

private boolean bargeTimeElapsed(){
     return System.nanoTime() - startTime > BARGE_WAIT_NANOS;
}

private boolean barge(Info refinfo){
     boolean barged = false;
     //if this transaction is older
     //  try to abort the other
     if(bargeTimeElapsed() && startPoint < refinfo.startPoint)
          {
        barged = refinfo.status.compareAndSet(RUNNING, KILLED);
        if(barged)
            refinfo.latch.countDown();
          }
     return barged;
}

 

Retries

 事務進行重試時,之前Ref的所有in-transaction變動都會被舍棄,並跳轉回事務體的起始位置.有很多原因導致重試:

1. 在Ref上調用ref-set或alter會"鎖住"Ref,但出現下面的情況:
(a) 其它線程獲取了Ref的讀鎖或寫鎖,導致無法獲取Ref的寫鎖.
(b) 在當前事務開始之后,已經有Ref的變動完成了提交.
(c) 另外一個事務已經做了in-transaction的變動還沒有提交,嘗試搶先(barge)於其它事務提交失敗


2. 嘗試在事務中讀取Ref的值,但出現了下面的情況:
(a) 別的事務搶先(barge)當前事務提交(當前事務的狀態不是RUNNING)
(b) 出現Fault.


3. 在Ref上調用 ref-set , alter , commute , ensure,但當前事務(當前事務狀態不是Running)被其它事務搶先(barge)提交.
4. 當前事務正在提交變動(committing),但另一個正在運行(running)的事務有事務中修改,嘗試搶先於該進程提交失敗.


事務不會無休止的進行重試,LockingTransaction定義了重試的上限:public static final int RETRY_LIMIT = 10000;超過限制會拋出異常.
重試的觸發是通過拋出RetryEx實現的.注意它是java.lang.Error的子類而不是Exception,這樣就不會被開發者吞掉異常.retry循環中有RetryEX異常捕獲處理邏輯.catch之后邏輯回到loop的起始位置,由於並沒有捕獲其它異常,所以一旦有其它異常產生就會導致退出重試循環.

static class RetryEx extends Error{
}

  // 看一下簡單的代碼結構:
Object run(Callable fn) throws Exception{
     boolean done = false;
     Object ret = null;
     ArrayList<Ref> locked = new ArrayList<Ref>();
     ArrayList<Notify> notify = new ArrayList<Notify>();

     for(int i = 0; !done && i < RETRY_LIMIT; i++)
          {
          try
               {
                 //. . . .
               }
          catch(RetryEx retry)
               {
               //eat this so we retry rather than fall out
               }
          finally
               {
               //. . .∓ .
               }
          }
     if(!done)
          throw Util.runtimeException("Transaction failed after reaching retry limit");
     return ret;
}

 

IllegalStateException



Clojure的很多方法都會拋出IllegalStateException.如果在事務內部拋出這個異常,事務不會進行Retry.STM中會拋出這個異常的情況有:


* 嘗試在當前線程獲取LockingTransaction對象,但是該對象還沒有創建或事務並沒有運行. 在事務外調用下面的方法就會看到這個異常:ref-set ,alter , commute or ensure .(后面有測試代碼)
* 嘗試獲取Ref的值 但是還沒有綁定值
* 嘗試使用ref-set或alter修改Ref的in-transaction值 但commute函數已經在當前Ref上調用.
* Ref的validation方法返回false或者拋出異常

 
Clojure 1.4.0
user=> (def a (ref 101))
#'user/a
user=> (ref-set a 1024)
IllegalStateException No transaction running  clojure.lang.LockingTransaction.getEx (LockingTransaction.java:208)
user=>
user=>  (def b (ref 23))
#'user/b
user=> @b
23
user=>  (dotimes [i 5] (dosync  (alter b + (commute b + 1000) )))
IllegalStateException Can't set after commute  clojure.lang.LockingTransaction.d
oSet (LockingTransaction.java:420)
user=>

 

Deadlocks, Livelocks and Race Conditions



   Clojure STM的實現保證了deadlocks, livelocks 和 race conditions不會出現.

   在多線程應用的語境中,上面這些術語可以這樣描述:deadlock 發生在並發運行的線程互相等待其它進程獨占的資源. livelock 發生在總是無法獲得完成任務需要的資源永遠等待的情況.race condition 多線程環修改數據,最后結果取決於運行時的精確時序.

   Clojure STM中Ref的修改是在事務中執行的.假設事務A B依次啟動並發執行.如果二者都想使用ref-set alter修改同一個Ref(寫沖突),這種情況會優先讓事務A執行,理由是它已經運行了較長的時間(假設它的運行時間已經超過了BARGE_WAIT_NANOS).如果是事務修改Ref的順序決定了重試的執行順序就可能帶來活鎖(livelock).事務 B會進行重試,可能重試多次,可能在事務A提交之后就看完成對Ref的修改了.事務B最終運行完成都是使用事務A提交的一致的狀態視圖.

如果事務使用commute修改Ref,不會有寫沖突. 獲取Refs的寫鎖是按照Ref Objects的創建順序依次獲取的.

STM Overhead


    使用STM比顯示用鎖還是增加一些開銷的,但這並不表示STM更慢.STM是樂觀鎖,顯示鎖是悲觀鎖,整體上STM能夠提供更好的性能和更大程度上的並發.當一個Ref在事務內還沒有被修改的時候會從其committed value chain中選擇合適的值.只要從當前事務開始起沒有事務提交新的值,這個選擇過程還是非常快的,因為值就是committed value chain的第一個節點.否則就要進行遍歷.

Ref的值只能在事務內部修改,這就有一個檢驗事務是否正在運行的額外成本.還要檢查Ref上沒有調用commute方法. 這玩意沒有意思 因為commute函數會在提交階段再次調用 在事務完成后 其它對Ref的賦值不會有持久的影響.如果這是Ref在事務中首次被修改,還要添加Ref到當前事務修改過的Ref集合中,標記Ref被當前事務修改(后面章節提到的lock會有詳細描述). 最后,新值被添加到當前事務的in-transaction values的map中.后續的讀操作就會從map中讀而非遍歷committed values chain.看下面的代碼:

 

Object doSet(Ref ref, Object val){
     if(!info.running())
          throw retryex;
     if(commutes.containsKey(ref))
          throw new IllegalStateException("Can't set after commute");
     if(!sets.contains(ref))
          {
          sets.add(ref);
          lock(ref);
          }
     vals.put(ref, val);
     return val;
}

 

LockingTransaction類的lock方法會引入一些開銷,總結其操作步驟:

1. 如果在當前transaction try較早時機在Ref上調用了ensure那么就要釋放Ref的讀鎖
2. 嘗試獲取Ref寫鎖 如果獲取失敗就觸發當前事務的retry
3. 如果當前transaction try開始之后另外的事務已經提交了對Ref的修改觸發retry
4. 如果另外的事務對Ref有一個尚未提交的更改,企圖搶先barge.如果失敗就觸發當前事務的retry
5. ref.tinfo = info; 把當前的Ref標記為被當前事務鎖定
6. 無論如何都釋放Ref的寫鎖

貼出來代碼對照着看:

//returns the most recent val
Object lock(Ref ref){
     //can't upgrade readLock, so release it
     releaseIfEnsured(ref);

     boolean unlocked = true;
     try
          {
          tryWriteLock(ref);
          unlocked = false;
          //第三條說的開銷
          if(ref.tvals != null && ref.tvals.point > readPoint)
               throw retryex;
          Info refinfo = ref.tinfo;

          //write lock conflict
          if(refinfo != null && refinfo != info && refinfo.running())
               {
               if(!barge(refinfo))
                    {
                    ref.lock.writeLock().unlock();
                    unlocked = true;
                    return blockAndBail(refinfo);
                    }
               }
          ref.tinfo = info;
          return ref.tvals == null ? null : ref.tvals.val;
          }
     finally
          {
          if(!unlocked)
               ref.lock.writeLock().unlock();
          }
}
// 這就是第一條所說的
private void releaseIfEnsured(Ref ref){
     if(ensures.contains(ref))
          {
          ensures.remove(ref);
          ref.lock.readLock().unlock();
          }
}
//這就是第二條說的  嘗試獲取寫鎖 如果失敗就拋出retryex
void tryWriteLock(Ref ref){
     try
          {
          if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))
               throw retryex;
          }
     catch(InterruptedException e)
          {
          throw retryex;
          }
}

 


事務結束的時候要,提交Ref的變動還有一些開銷,提交之后新的變動外部可見.需要的步驟如下:

1. 修改當前事務的狀態RUNNING -> COMMITTING
2. 再次運行事務中的commute方法.這需要在每一個commuted Ref上獲取寫鎖
3. 獲取在事務中修改過Ref的寫鎖
4. 在所有被修改過的Ref上調用Validator函數,如果出現驗證失敗的情況就觸發當前事務的Retry.
5. 對於改動過的Ref添加node到committed value chain或修改現有node,如何操作取決於falut minhistory maxhistory
6. 對於被改動過且至少有一個watcher的Ref創建Notify對象
7. 修改事務狀態COMMITTING -> COMMITTED
8. 釋放之前獲取的所有寫鎖
9. 釋放所有因為ensure調用而持有的讀鎖
10. 清理現場 locked (local), ensures , vals , sets and commutes為下一次Retry或者當前線程的新事務做准備
11. 如果提交成功,通知所有注冊了變動watcher使用Notify對象 並且將事務中的所有所有的action分發到Agents
12. 清理notify (local) 和actions collections 為下一次Retry或當前線程的下一個事務做准備
13. 返回事務體(transaction body)的最后一個表達式的值


看代碼片段:

    //make sure no one has killed us before this point, and can't from now on
               if(info.status.compareAndSet(RUNNING, COMMITTING))
                    {
                    for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
                         {
                         Ref ref = e.getKey();
                         if(sets.contains(ref)) continue;

                         boolean wasEnsured = ensures.contains(ref);
                         //can't upgrade readLock, so release it
                         releaseIfEnsured(ref);
                         tryWriteLock(ref);
                         locked.add(ref);
                         if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
                              throw retryex;

                         Info refinfo = ref.tinfo;
                         if(refinfo != null && refinfo != info && refinfo.running())
                              {
                              if(!barge(refinfo))
                                   throw retryex;
                              }
                         Object val = ref.tvals == null ? null : ref.tvals.val;
                         vals.put(ref, val);
                         for(CFn f : e.getValue())
                              {
                              vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));
                              }
                         }
                    for(Ref ref : sets)
                         {
                         tryWriteLock(ref);
                         locked.add(ref);
                         }

                    //validate and enqueue notifications
                    for(Map.Entry<Ref, Object> e : vals.entrySet())
                         {
                         Ref ref = e.getKey();
                         ref.validate(ref.getValidator(), e.getValue());
                         }

                    //at this point, all values calced, all refs to be written locked
                    //no more client code to be called
                    long msecs = System.currentTimeMillis();
                    long commitPoint = getCommitPoint();
                    for(Map.Entry<Ref, Object> e : vals.entrySet())
                         {
                         Ref ref = e.getKey();
                         Object oldval = ref.tvals == null ? null : ref.tvals.val;
                         Object newval = e.getValue();
                         int hcount = ref.histCount();

                         if(ref.tvals == null)
                              {
                              ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
                              }
                         else if((ref.faults.get() > 0 && hcount < ref.maxHistory)
                                   || hcount < ref.minHistory)
                              {
                              ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
                              ref.faults.set(0);
                              }
                         else
                              {
                              ref.tvals = ref.tvals.next;
                              ref.tvals.val = newval;
                              ref.tvals.point = commitPoint;
                              ref.tvals.msecs = msecs;
                              }
                         if(ref.getWatches().count() > 0)
                              notify.add(new Notify(ref, oldval, newval));
                         }

                    done = true;
                    info.status.set(COMMITTED);
                    }
                  }
          catch(RetryEx retry)
               {
               //eat this so we retry rather than fall out
               }
          finally
               {
               for(int k = locked.size() - 1; k >= 0; --k)
                    {
                    locked.get(k).lock.writeLock().unlock();
                    }
               locked.clear();
               for(Ref r : ensures)
                    {
                    r.lock.readLock().unlock();
                    }
               ensures.clear();
               stop(done ? COMMITTED : RETRY);
               try
                    {
                    if(done) //re-dispatch out of transaction
                         {
                         for(Notify n : notify)
                              {
                              n.ref.notifyWatches(n.oldval, n.newval);
                              }
                         for(Agent.Action action : actions)
                              {
                              Agent.dispatchAction(action);
                              }
                         }
                    }
               finally
                    {
                    notify.clear();
                    actions.clear();
                    }
               }
          }
     if(!done)
          throw Util.runtimeException("Transaction failed after reaching retry limit");
     return ret;
}


https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/ARef.java

void validate(IFn vf, Object val){
     try
          {
          if(vf != null && !RT.booleanCast(vf.invoke(val)))
               throw new IllegalStateException("Invalid reference state");
          }
     catch(RuntimeException re)
          {
          throw re;
          }
     catch(Exception e)
          {
          throw new IllegalStateException("Invalid reference state", e);
          }
}

 

 明確了上面的概念代碼實現,之前的疑惑已經能解答了,對Clojure STM也有了一點認識,好奇心遠遠未滿足,會繼續關注.



附原文引用:

 

 最后,小圖一張  祝大家新年 嘉欣

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM