深入理解Akka Actor模型


Carl Hewitt 在1973年對Actor模型進行了如下定義:"Actor模型是一個把'Actor'作為並發計算的通用原語". Actor是異步驅動,可以並行和分布式部署及運行的最小顆粒。也就是說,它可以被分配,分布,調度到不同的CPU,不同的節點,乃至不同的時間片上運行,而不影響最終的結果。因此Actor在空間(分布式)和時間(異步驅動)上解耦的。而Akka是Lightbend(前身是Typesafe)公司在JVM上的Actor模型的實現。我們在了解actor模型之前,首先來了解actor模型主要是為了解決什么樣的問題。

Why modern systems need a new programming model

在akka系統的官網上主要介紹了現代並發編程模型所遇到的問題,里面主要提到了三個點

1) 在面向對象的語言中一個顯著的特點是封裝,然后通過對象提供的一些方法來操作其狀態,但是共享內存的模型下,多線程對共享對象的並發訪問會造成並發安全問題。一般會采用加鎖的方式去解決
image.png
加鎖會帶來一些問題

  • 加鎖的開銷很大,線程上下文切換的開銷大
  • 加鎖導致線程block,無法去執行其他的工作,被block無法執行的線程,其實也是占據了一種系統資源
  • 加鎖在編程語言層面無法防止隱藏的死鎖問題

2)我們知道Java中並發模型是通過共享內存來實現。而cpu中會利用局部cache來加速主存的訪問,為了解決多線程間緩存不一致的問題,在java中一般會通過使用volatile或者Atmoic來標記變量,通過Jmm的happens before機制來保障多線程間共享變量的可見性。因此從某種意義上來說是沒有共享內存的,而是通過cpu將cache line的數據刷新到主存的方式來實現可見。
因此與其去通過標記共享變量或者加鎖的方式,依賴cpu緩存更新,倒不如每個並發實例之間只保存local的變量,而在不同的實例之間通過message來傳遞。

3)call stack的問題
當我們編程模型異步化之后,還有一個比較大的問題是調用棧轉移的問題,如下圖中主線程提交了一個異步任務到隊列中,Worker thread 從隊列提取任務執行,調用棧就變成了workthread發起的,當任務出現異常時,處理和排查就變得困難。
image.png

How the Actor Model Meets the Needs of Modern Distributed Systems

那么akka 的actor的模型是怎樣處理這些問題的,actor模型中的抽象主體變為了actor,

  • actor之間可以互相發送message。
  • actor在收到message之后會將其存入其綁定的Mailbox中。
  • Actor從Mailbox中提取消息,執行內部方法,修改內部狀態。
  • 繼續給其他actor發送message。

可以看到下圖,actor內部的執行流程是順序的,同一時刻只有一個message在進行處理,也就是actor的內部邏輯可以實現無鎖化的編程。actor和線程數解耦,可以創建很多actor綁定一個線程池來進行處理,no lock,no block的方式能減少資源開銷,並提升並發的性能

image.png

actor編程樣例

下面簡單來看一個actor的樣例

依賴

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.11</artifactId>
    <version>2.4.20</version>
</dependency>

Main

    public static void main(String[] args) throws InterruptedException {
        final ActorSystem actorSystem = ActorSystem.create("actor-system");

        final ActorRef actorRef = actorSystem.actorOf(Props.create(BankActor.class), "bank-actor");

        CountDownLatch addCount = new CountDownLatch(20);
        CountDownLatch minusCount = new CountDownLatch(10);

        Thread addCountT = new Thread(new Runnable() {
            @Override
            public void run() {
                while (addCount.getCount() > 0) {
                    actorRef.tell(Command.ADD, null);
                    addCount.countDown();
                }
            }
        });

        Thread minusCountT = new Thread(new Runnable() {
            @Override
            public void run() {
                while (minusCount.getCount() > 0) {
                    actorRef.tell(Command.MINUS, null);
                    minusCount.countDown();
                }
            }
        });

        minusCountT.start();
        addCountT.start();
        minusCount.await();
        addCount.await();

        Future<Object> count = Patterns.ask(actorRef, Command.GET, 1000);
        count.onComplete(
                new OnComplete<Object>() {
                    @Override
                    public void onComplete(Throwable failure, Object success) throws Throwable {
                        if (failure != null) {
                            failure.printStackTrace();
                        } else {
                            log.info("Get result from " + success);
                        }
                    }
                },
                Executors.directExecutionContext());
        actorSystem.shutdown();
    }
  1. 創建actor
  2. 通過actorRef和actor並發交互
  3. 獲取actor最后的狀態

actor

public class BankActor extends UntypedActor {

    private static final Logger log = LoggerFactory.getLogger(BankActor.class);
    private int count;

    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        count = 0;
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        // 可以使用枚舉或者動態代理類來實現方法調用
        if (message instanceof Command) {
            Command cmd = (Command) message;
            switch (cmd) {
                case ADD:
                    log.info("Add 1 from {} to {}", count, ++count);
                    break;
                case MINUS:
                    log.info("Minus 1 from {} to {}", count, --count);
                    break;
                case GET:
                    log.info("Return current count " + getSender());
                    getSender().tell(count, this.getSelf());
                    break;
                default:
                    log.warn("UnSupport cmd: " + cmd);
            }
        } else {
            log.warn("Discard unknown message: {}", message);
        }
    }
}
enum Command {
    ADD,
    MINUS,
    GET
}
15:36:46.376 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Add 1 from 0 to 1
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from 1 to 0
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from 0 to -1
15:36:46.385 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from -1 to -2
15:36:46.386 [actor-system-akka.actor.default-dispatcher-5] INFO akka.BankActor - Minus 1 from -2 to -3
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -3 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.386 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -4 to -5
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -5 to -6
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Minus 1 from -6 to -7
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -7 to -6
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -6 to -5
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -5 to -4
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -4 to -3
15:36:46.387 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -3 to -2
15:36:46.393 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -2 to -1
15:36:46.393 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from -1 to 0
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 0 to 1
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 1 to 2
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 2 to 3
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 3 to 4
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 4 to 5
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 5 to 6
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 6 to 7
15:36:46.394 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 7 to 8
15:36:46.395 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 8 to 9
15:36:46.395 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Add 1 from 9 to 10
15:36:46.402 [actor-system-akka.actor.default-dispatcher-2] INFO akka.BankActor - Return current count Actor[akka://actor-system/temp/$a]
15:36:46.403 [actor-system-akka.actor.default-dispatcher-2] INFO akka.ActorTest - Get result from 10

在這個例子中簡單模擬了並發加減操作,例子中最終是+20 , -10,最終結果為10。我們可以看到actor內部並不需要通過加鎖或設置volatile的方式來維護其並發安全性,使用起來非常的方便,貌似再也不用擔心並發安全的問題了,那么akka具體是幫我們怎么做的呢?

actor內部實現

首先當我們討論並發安全的時候我們實際上是在說在多線程運行的情況下,如何保證程序的原子性,有序性和可見性,akka也是基於java的應用,因此分析akka是怎么做的也逃不開java內存模型。JSR-133使用happens-before的概念來闡述操作之間的內存可見性,其中主要包括

  • 程序順序規則:一個線程中的每個操作,happens-before於該線程中的任意后續操作。
  • 監視器鎖規則:對一個鎖的解鎖,happens-before於隨后對這個鎖的加鎖。
  • volatile變量規則:對一個volatile域的寫,happens-before於任意后續對這個volatile域的讀。
  • 傳遞性:如果A happens-before B,且B happens-before C,那么A happens-before C。

基於此,akka也提出了他的happens-before原則
image.png
image.png
簡單的說就是akka能夠保證在處理下一個message時,對於上一個message對actor內部狀態的改動是可見的。

在Stack Overflow上也有小伙伴提出這個問題,可以參考一下
https://stackoverflow.com/questions/27484460/akka-what-is-the-reason-of-processing-messages-one-at-a-time-in-an-actor

那么akka具體是怎么實現的呢?

原子性,有序性

image.png
在akka模型中,每一個actor都有一個相應的郵箱用於存放接受到的message, 然后一個Dispatcher負責線程調度,處理mailbox中的message

Enqueue

image.png
在actor接受到郵件后, 首先會將message放入隊列中,並提交一個異步任務

registerForExecution

image.png
提交的異步任務是執行MailBox (MailBox是一個Runnable實現)

Mailbox#run

image.png
主體是先處理系統消息,然后處理MailBox中的用戶message

processMailBox

image.png
處理message的邏輯比較簡單,這里可以看到每一個時刻都是只有一個線程在執行mailbox中的message,一次處理多少是有throughput的參數來決定。因此這種模型下消息處理就是順序的(但是這也有壞處,message處理的主流程不能block,否則message處理都會被拖慢,因此在Flink工程中mater的actor內部大量采用了基於CompletableFuture異步編程的方式)。actor內部的變量,如上面例子中的count,是不會有並發訪問的,因此原子性和有序性都得到了保障。

但是,細心的同學可能發現,雖然每次執行的線程都只有一個,但是具體是哪個線程並不是綁定的,兩次執行的線程完全可能不相同,甚至可能調度在不同的cpu上。不同線程更改count變量之后,這個變量也沒有聲明成volatile,如何保證線程1 執行message1更新完后,線程2執行message2時能看到1的變更結果呢?

也有同學產生了類似的問題,可以參考
https://stackoverflow.com/questions/10165603/should-my-akka-actors-properties-be-marked-volatile/

Because an actor will only ever handle one message at any given time, we can guarantee that accessing the actor's local state is safe to access, even though the Actor itself may be switching Threads which it is executing on. Akka guarantees that the state written while handling message M1 are visible to the Actor once it handles M2, even though it may now be running on a different thread (normally guaranteeing this kind of safety comes at a huge cost, Akka handles this for you).

有人回答了Akka handles this for you. 但是我還是很奇怪,akka是怎么做的呢?

可見性

Java Memory Model

首先我們再來回顧一下Java內存模型,JMM內存模型中抽象了四種內存屏障用於處理cpu指令重排帶來的線程安全問題。

  • LoadLoad屏障:對於這樣的語句Load1; LoadLoad; Load2,在Load2及后續讀取操作要讀取的數據被訪問前,保證Load1要讀取的數據被讀取完畢。
  • StoreStore屏障:對於這樣的語句Store1; StoreStore; Store2,在Store2及后續寫入操作執行前,保證Store1的寫入操作對其它處理器可見。
  • LoadStore屏障:對於這樣的語句Load1; LoadStore; Store2,在Store2及后續寫入操作被刷出前,保證Load1要讀取的數據被讀取完畢。
  • StoreLoad屏障:對於這樣的語句Store1; StoreLoad; Load2,在Load2及后續所有讀取操作執行前,保證Store1的寫入對所有處理器可見。它的開銷是四種屏障中最大的。在大多數處理器的實現中,這個屏障是個萬能屏障,兼具其它三種內存屏障的功能

詳細解釋可以參考:https://github.com/openjdk/jdk/blob/6bab0f539fba8fb441697846347597b4a0ade428/src/jdk.internal.vm.ci/share/classes/jdk.vm.ci.code/src/jdk/vm/ci/code/MemoryBarriers.java

除了以上四種,HotSpot VM還定義了特殊的acquire和release內存屏障,acquire防止它后面的讀寫操作重排序到acquire的前面;release防止它前面的讀寫操作重排序到release后面。
acquire和release兩者放在一起就像一個柵欄,可禁止柵欄內的事務跑到柵欄外,但是它不阻止柵欄外的事務跑到柵欄內。
acquire可以由LoadLoad + LoadStore組成,release可以由StoreStore 和LoadStore組成,他們都沒有使用StoreLoad屏障,這意味着x86架構原生就具有acquire和release的語義。

因為x86架構下是強內存模型,只允許Store和Load順序重排,因此內存barrier實際也只有StoreLoad一種實現。但是這里我們不去過多的糾結不同cpu架構的細節。

-- 《深入解析Java虛擬機HotSpot》 第六章

image.png
volatile關鍵字會生成的memory barrier

Synchronizes-With

在回憶完Java內存模型后,我們再來看上面提到的MailBox#run方法的實現
image.png
上面代碼中Volatile read 和 Volatile write分別通過unsafe工具以Volatile的方式去讀取和修改Mailbox對象的內部變量。

// volatile read
Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)   
    
// volatile write
Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)

根據上面JMM所定義的內容,實際上會在volatile read write 前后插入相應的memory barrier。形成如下的交互模式。第一個線程執行完mailbox內容后執行volatile write 插入release barrier,第二個線程啟動后執行volatile read 插入acquire barrier,這樣在這兩個線程之間就形成了happens before的關系,從而保障了可見性。
image.png

因此我們可以得出結論,通過對Mailbox內部volatile變量的讀寫,借助volatile的內存屏障語義,再加上單線程執行模型,實現了actor 內部狀態變量的可見性。

這種方式也被稱作synchronize-with,這是一種保障線程間變量可見的機制。在實現中一般會有兩種變量。

  • guard variable 門禁變量
  • payload 真正在兩個線程間需要傳遞/共享的變量

在akka這個實現中Mailbox的volatile status變量就是門禁,actor internal state 就是payload。

In Java version 5 onward, every store to a volatile variable is a write-release, while every load from a volatile variable is a read-acquire. Therefore, any volatile variable in Java can act as a guard variable, and can be used to propagate a payload of any size between threads.

另外synchronize with實現有很多種方式,可以通過volatile也可以通過原子變量,還可以通過鎖等等。通過volatile實現的也被稱做volatile-piggyback
image.png

通過volatile-piggyback這種方式實現有幾個好處

  1. 首先顯而易見的就是避免使用lock
  2. 用戶actor代碼可以不用使用volatile來標記變量,可以減少barrier的數量,提升性能。也降低了並發程序的復雜度
  3. 這種方式下不會產生較重的StoreLoad barrier,所以真實的性能開銷應該也比較低,特別對於x86架構來說LoadLoad,LoadStore等都是空操作。和基於lock實現的同步就更占優勢了。

關於x86強內存模型的補充說明:
可能有同學還有疑問(其實是我之前還有疑問),因為上面提到,在x86強內 存模型下,本身就帶有acquire 和 release語義,那加上這個volatile關鍵字有什么意義呢?當然我們開發java代碼的時候肯定要寫跨平台統一的代碼,但是除開這個之外,volatile關鍵字還會禁止編譯器的指令重排,從而從編譯層面保障不會破壞Happen-before的語義。

例如以下HotSpot Vm中,指令內存屏障實現的OrderAccess模塊,其中除了StoreLoad,其他三者都只執行了comiler_barrier方法,這里方法中volatile表示禁止編譯器優化匯編代碼。memory表示告知編譯器匯編代碼執行內存讀取和寫入的操作,編譯器可能需要在執行匯編前將一些指定的寄存器刷入內存。而StoreLoad方法則是使用指令加上lock前綴來用作內存屏障指令
image.png
《深入java虛擬機HotSpot》第六章

總結

從我個人理解角度說,akka的actor模型采用基於消息傳遞的機制實現並發編程,可以實現無鎖異步化的編程模型,並且通過親和性調度等方案,可以更好的利用cpu cache,對於高並發場景來說應該是一大利器。

參考

preshing大神的一系列並發內存模型的文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM