Java並發57:Akka Actors並發框架淺談及入門示例


本章對Actors並發框架進行初步的介紹和入門示例的演示,關於其更深層次的內容,以后會系統性的進行學習。

1.Actors並發模型簡介

Actors並發模型是計算機科學領域中的一個並行計算模型,它把actors當做通用的並行計算原語。
一個actor對接收到的消息做出響應,進行本地決策,可以創建更多的actor,或者發送更多的消息;同時准備接收下一條消息。
在Actor理論中,一切都被認為是actor,這和面向對象語言里一切都被看成對象很類似。
但包括面向對象語言在內的軟件通常是順序執行的,而Actor模型本質上則是並發的。

Actors模型對並發模型進行了更高的抽象,它是一種異步、非阻塞、高性能的輕量級事件驅動編程模型。所謂輕量級事件指的是這些acotr內存消耗極小,1GB內存可容納百萬級別個Actor。Actors模型適用於可以用於高並發、分布式場景。

JDK本身並沒有提供對Actors並發模型的實現,不過其他的開發庫中已經實現並且被廣泛應用,例如本章的學習對象:Akka

2.Akka Actors模型簡介

Akka Actors官方文檔地址:https://doc.akka.io/docs/akka/current/index-actors.html

Akka Actors的模型圖如下:

 

 


總結:

  • Actor與Actor之前只能用消息進行通信。
  • 每個Actor都有一個郵箱。
  • Actor與Actor之間並不是直接通過消息通信,而是將消息發送至此Actor的郵箱MailBox
  • 消息在郵箱MailBox是有序的。
  • 每個Actor在處理單個消息時都是串行的。
  • Actor中的消息是不可變的。
  • 消息的傳遞不保證絕對可靠

Akka Actors模型的優勢:

  • 輕量級事件比線程的粒度小得多,意味着可以在程序中使用大量的Actor。
  • Akka提供了一套容錯機制,允許在出錯時進行一些恢復或者重置操作。
  • Akka依靠其透明的遠程Actor定位服務,保證了其分布式並發環境中的可用性

3.Akka Actors重要概念

下面介紹Akka Actors模型的幾個重要概念:

1.Actor

Actor即角色,前面已經說過,Akka Actors模型將actors當做通用的並行計算原語,所以Actor是必不可少的。

Actor總結:

  • Akka Actor的組織結構是一種樹形結。
  • 因為Actor是樹形組織,所以Actor的路徑類似於文件的路徑。
  • 每個Actor都有父級,有可能有子級當然也可能沒有。
  • 父級Actor給其子級Actor分配資源,任務,並管理其的生命狀態(監管和監控)。
  • 如果我們知道一個遠程Actor的具體位置,那么我們就可以向他發送消息。
  • 一個本地Actor的路徑:akka://search-system/user/master
  • 一個遠程Actor的路徑:akka.tcp://search-system@host.example.com:5678/user/master

2.ActorSystem

ActorSystem即角色系統,為了統一的調度和管理系統中的眾多actors,我們需要首先定義一個ActorSystem。

ActorSystem的主要功能有三個:

  1. 統一管理和調度actors,如:任務的拆分、處理等等。
  2. 配置Actor系統參數,如:攔截器、優先、路由策略等等。
  3. 日志功能:為了保證Akka的高容錯機制,編程中盡量需要完善的日志記錄,以便出錯處理。

3.ActorRef

ActorRef即角色引用,每個Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,與Actor打交道都需要通過Actor引用。

4.Akka Actors入門編程流程

下面簡述一下Akka Actors的入門級的編程流程:

  1. 編寫Message類,用於存放消息。
  2. 編寫Actor類,用於定義接收到消息之后所做的操作,注意日志處理。
  3. 定義一個ActorSystem,用於管理和調度程序中的Actor。
  4. 定義ActorRef,用於引用Actor類。
  5. 調用actorRef.tell(message),用於向此Actor發送消息。
  6. 關閉ActorSystem。

5.編程實例

下面以兩個簡單的例子來展示Actor並發編程的基本流程:

  • Hello World
  • 最快搜索:通過多個搜索引擎查詢某個條件,返回最快的查詢結果。

5.1.Hello World

場景說明:

  • 定義一個消息類,用於存放被歡迎的內容。
  • 定義一個角色類,用於歡迎接收到的消息。

實例代碼:

消息類:

/**
 * <p>Hello的消息類</p>
 * @author hanchao 2018/4/16 21:34
 **/
public class HelloMessage{
    /** 歡迎的對象 */
    private String name;

    //getter setter constructor toString
}

角色類:

/**
 * <p>Actor框架入門示例-HelloWorld的角色</p>
 *
 * @author hanchao 2018/4/16 21:07
 **/
public class HelloActor extends UntypedAbstractActor {

    //定義日志,很重要
    LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);
    /**
     * <p>重寫接收方法</p>
     * @author hanchao 2018/4/16 21:31
     **/
    @Override
    public void onReceive(Object message){
        log.info("HelloActor receive message : " + message);
        //如果消息類型是HelloMessage,則進行處理
        if (message instanceof HelloMessage){
            log.info("Hello " + ((HelloMessage) message).getName() + "!");
        }
    }
}

測試類:

/**
 * <p>Actor入門示例</p>
 * @author hanchao 2018/4/16 21:37
 **/
public static void main(String[] args) {
    //創建actor系統
    ActorSystem system = ActorSystem.create("hello-system");
    //定義Actor引用
    ActorRef helloActor = system.actorOf(Props.create(HelloActor.class),"hello-actor");

    //向HelloActor發送消息
    helloActor.tell(new HelloMessage("World"),null);
    helloActor.tell(new HelloMessage("Akka Actor"),null);

    //終止Actor系統
    system.terminate();
}

運行結果:

[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='World'}
[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello World!
[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='Akka Actor'}
[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello Akka Actor!

5.2.最快搜索

場景說明:

通過多個搜索引擎查詢某個條件,返回最快的查詢結果。

  • 定義一個消息類,用於存放搜索條件。
  • 定義一個消息類,用於存放搜索結果。
  • 定義一個角色類,用於接收搜索條件、搜索和返回搜索結果。
  • 定義一個角色類,用於派發搜索任務和接收搜索結果。

實例代碼:

搜索引擎工具類EngineUtils :模擬搜索引擎列表和搜索過程。

/**
 * 搜索引擎工具類
 * Created by 韓超 on 2018/3/6.
 */
public class EngineUtils {
    private final static Logger LOGGER = Logger.getLogger(EngineUtils.class);

    //搜索引擎列表
    private static List<String> engineList;

    static {
        engineList = new ArrayList<>();
        engineList.add("百度");
        engineList.add("Google");
        engineList.add("必應");
        engineList.add("搜狗");
        engineList.add("Redis");
        engineList.add("Solr");
    }

    /**
     * <p>Title: 模擬一個搜索引擎進行一次問題查詢</p>
     * @author 韓超 2018/3/6 11:20
     */
    public static String searchByEngine(String question,String engine) throws InterruptedException {
        //獲取隨機的時間間隔
        int interval = RandomUtils.nextInt(1,5000);
//        LOGGER.info("搜索引擎[" + engine + "]正在查詢,預計用時" + interval + "毫秒...");
        //當前線程休眠指定時間,模擬搜索引擎用時
        Thread.sleep(interval);
        return "通過搜索引擎[" + engine + "],首先查到關於(" + question + ")問題的結果,用時 = " + interval + "毫秒!";
    }

    public static List<String> getEngineList() {
        return engineList;
    }

    public static void setEngineList(List<String> engineList) {
        EngineUtils.engineList = engineList;
    }
}

搜索條件消息類QueryTerms ,用於存儲搜索條件:

/**
* <p>Title: 定義查詢條件類,用於傳遞消息</p>
 *
 * @author 韓超 2018/3/6 16:16
 */
static class QueryTerms {
    /**
     * 問題
     */
    private String question;
    /**
     * 搜索引擎
     */
    private String engine;

    //getter setter toString constructor
}

搜索結果消息類QueryResult ,用於存放搜索結果:

/**
* <p>Title: 定義查詢結果類,用於消息傳遞</p>
 *
 * @author 韓超 2018/3/6 16:17
 */
static class QueryResult {
    /**
     * 查詢結果
     */
    private String result;

    //getter setter toString constructor
}

搜索角色類SearchEngineAcotr ,用於接收搜索條件,調用搜索引擎進行搜索:

/**
 * <p>Title:搜索引擎Actor </br>
 * 繼承UntypedAbstractActor成為一個Actor</p>
 *
 * @author 韓超 2018/3/6 14:42
 */
static class SearchEngineAcotr extends UntypedAbstractActor {
    //定義Actor日志
    private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);
    /**
     * <p>Title: Actor都需要重寫消息接收處理方法</p>
     *
     * @author 韓超 2018/3/6 14:42
     */
    @Override
    public void onReceive(Object message) throws Throwable {
        //如果消息是指定的類型Message,則進行處理,否則不處理
        if (message instanceof QueryTerms) {
            log.info("接收到搜索條件:" + ((QueryTerms) message).getQuestion());
            //通過工具類進行一次搜索引擎查詢
            String result = EngineUtils.searchByEngine(((QueryTerms) message).getQuestion(), ((QueryTerms) message).getEngine());
            //通過getSender().tell(result,actor)將actor的 處理結果[result] 發送消息的發送者[getSender()]
            //通過getSender獲取消息的發送方
            //通過getSelf()獲取當前Actor
            getSender().tell(new QueryResult(result), getSelf());
        } else {
            unhandled(message);
        }
    }
}

主角色類QuestionQuerier ,用於分發搜索任務和接收搜索結果:

/**
* <p>Title: 問題查詢器Actor</br>
 * 繼承自UntypedAbstractActor</p>
 *
 * @author 韓超 2018/3/6 16:31
 */
static class QuestionQuerier extends UntypedAbstractActor {
    //定義Actor日志
    private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);
    /**
     * 搜索引擎列表
     */
    private List<String> engines;
    /**
     * 搜索結果
     */
    private AtomicReference<String> result;
    /**
     * 問題
     */
    private String question;

    public QuestionQuerier(String question, List<String> engines, AtomicReference<String> result) {
        this.question = question;
        this.engines = engines;
        this.result = result;
    }

    /**
     * <p>Title: Actor都需要重寫消息接收處理方法</p>
     *
     * @author 韓超 2018/3/6 16:35
     */
    @Override
    public void onReceive(Object message) throws Throwable {
        //如果收到查詢結果,則對查詢結果進行處理
        if (message instanceof QueryResult) {//如果消息是指定的類型Result,則進行處理,否則不處理
            log.info("接收到搜索結果:" + ((QueryResult) message).getResult());
            //通過CAS設置原子引用的值
            result.compareAndSet(null, ((QueryResult) message).getResult());
            //如果已經查詢到了結果,則停止Actor
            //通過getContext()獲取ActorSystem的上下文環境
            //通過getContext().stop(self())停止當前Actor
            getContext().stop(self());
        } else {//如果沒有收到處理結果,則創建搜索引擎Actor進行查詢
            log.info("開始創建搜索引擎進行查詢");

            //使用原子變量去測試Actor的創建是否有序
            AtomicInteger count = new AtomicInteger(1);

            //針對每一個搜索引擎,都創建一個Actor
            for (String engine : engines) {
                log.info("為" + engine + "創建第" + count + "個搜索引擎Actor....");
                count.getAndIncrement();

                //通過actorOf(Props,name)創建Actor
                //通過Props.create(Actor.class)創建Props
                ActorRef fetcher = this.getContext().actorOf(Props.create(SearchEngineAcotr.class), "fetcher-" + engine.hashCode());
                //創建查詢條件
                QueryTerms msg = new QueryTerms(question, engine);
                //將查詢條件告訴Actor
                fetcher.tell(msg, self());
            }
        }
    }
}

測試代碼:

/**
 * <p>Title:通過多個搜索引擎查詢多個條件,並返回第一條查詢結果 </p>
 *
 * @author 韓超 2018/3/6 14:15
 */
public static void main(String[] args) {
    //通過工具類獲取搜索引擎列表
    List<String> engines = EngineUtils.getEngineList();
    //通過 Actor 進行並發查詢,獲取最先查到的答案
    String result = new FlavorActorDemo().getFirstResult("今天你吃了嗎?", engines);
    //打印結果
}

/**
 * 通過多個搜索引擎查詢,並返回第一條查詢結果
 *
 * @param question 查詢問題
 * @param engines  查詢條件數組
 * @return 最先查出的結果
 * @author 韓超 2018/3/6 16:44
 */
@Override
public String getFirstResult(String question, List<String> engines) {
    //創建一個Actor系統
    ActorSystem system = ActorSystem.create("search-system");
    //創建一個原子引用用於保存查詢結果
    AtomicReference<String> result = new AtomicReference<>();
    //通過靜態方法,調用Props的構造器,創建Props對象
    Props props = Props.create(QuestionQuerier.class, question, engines, result);
    //通過system.actorOf(props,name)創建一個 問題查詢器Actor
    final ActorRef querier = system.actorOf(props, "master");
    //告訴問題查詢器開始查詢
    querier.tell(new Object(), ActorRef.noSender());

    //通過while無限循環 等待actor進行查詢,知道產生結果
    while (null == result.get()) ;
    //關閉 Actor系統
    system.terminate();
    //返回結果
    return result.get();
}

運行結果(某一次):

[INFO] [04/18/2018 23:10:11.949] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 開始創建搜索引擎進行查詢
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為百度創建第1個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Google創建第2個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為必應創建第3個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-6] [akka://search-system/user/master/fetcher-964584] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為搜狗創建第4個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Redis創建第5個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Solr創建第6個搜索引擎Actor....
[INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master/fetcher-2138589785] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-2] [akka://search-system/user/master/fetcher-2582786] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master/fetcher-784239] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-3] [akka://search-system/user/master/fetcher-78837083] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:11.975] [search-system-akka.actor.default-dispatcher-4] [akka://search-system/user/master/fetcher-823867] 接收到搜索條件:今天你吃了嗎?
[INFO] [04/18/2018 23:10:13.255] [search-system-akka.actor.default-dispatcher-11] [akka://search-system/user/master] 接收到搜索結果:通過搜索引擎[搜狗],首先查到關於(今天你吃了嗎?)問題的結果,用時 = 1279毫秒!
[INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2582786#-286481483] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-964584#166806804] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2138589785#376820931] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-78837083#-224726121] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-784239#1622650486] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

參考文獻

[1] Akka系列(一):Akka簡介與Actor模型
[2] 漫談並發編程:Actor模型
[3] Java並發的四種風味:Thread、Executor、ForkJoin和Actor




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM