本章對Actors並發框架進行初步的介紹和入門示例的演示,關於其更深層次的內容,以后會系統性的進行學習。
1.Actors並發模型簡介
Actors並發模型是計算機科學領域中的一個並行計算模型,它把actors當做通用的並行計算原語。
一個actor對接收到的消息做出響應,進行本地決策,可以創建更多的actor,或者發送更多的消息;同時准備接收下一條消息。
在Actor理論中,一切都被認為是actor,這和面向對象語言里一切都被看成對象很類似。
但包括面向對象語言在內的軟件通常是順序執行的,而Actor模型本質上則是並發的。
Actors模型對並發模型進行了更高的抽象,它是一種異步、非阻塞、高性能的輕量級事件驅動編程模型。所謂輕量級事件指的是這些acotr內存消耗極小,1GB內存可容納百萬級別個Actor。Actors模型適用於可以用於高並發、分布式場景。
JDK本身並沒有提供對Actors並發模型的實現,不過其他的開發庫中已經實現並且被廣泛應用,例如本章的學習對象:Akka。
2.Akka Actors模型簡介
Akka Actors官方文檔地址:https://doc.akka.io/docs/akka/current/index-actors.html
Akka Actors的模型圖如下:

總結:
- Actor與Actor之前只能用消息進行通信。
- 每個Actor都有一個郵箱。
- Actor與Actor之間並不是直接通過消息通信,而是將消息發送至此Actor的郵箱MailBox。
- 消息在郵箱MailBox是有序的。
- 每個Actor在處理單個消息時都是串行的。
- Actor中的消息是不可變的。
- 消息的傳遞不保證絕對可靠。
Akka Actors模型的優勢:
- 輕量級事件比線程的粒度小得多,意味着可以在程序中使用大量的Actor。
- Akka提供了一套容錯機制,允許在出錯時進行一些恢復或者重置操作。
- Akka依靠其透明的遠程Actor定位服務,保證了其分布式並發環境中的可用性。
3.Akka Actors重要概念
下面介紹Akka Actors模型的幾個重要概念:
1.Actor
Actor即角色,前面已經說過,Akka Actors模型將actors當做通用的並行計算原語,所以Actor是必不可少的。
Actor總結:
- Akka Actor的組織結構是一種樹形結。
- 因為Actor是樹形組織,所以Actor的路徑類似於文件的路徑。
- 每個Actor都有父級,有可能有子級當然也可能沒有。
- 父級Actor給其子級Actor分配資源,任務,並管理其的生命狀態(監管和監控)。
- 如果我們知道一個遠程Actor的具體位置,那么我們就可以向他發送消息。
- 一個本地Actor的路徑:akka://search-system/user/master
- 一個遠程Actor的路徑:akka.tcp://search-system@host.example.com:5678/user/master
2.ActorSystem
ActorSystem即角色系統,為了統一的調度和管理系統中的眾多actors,我們需要首先定義一個ActorSystem。
ActorSystem的主要功能有三個:
- 統一管理和調度actors,如:任務的拆分、處理等等。
- 配置Actor系統參數,如:攔截器、優先、路由策略等等。
- 日志功能:為了保證Akka的高容錯機制,編程中盡量需要完善的日志記錄,以便出錯處理。
3.ActorRef
ActorRef即角色引用,每個Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,與Actor打交道都需要通過Actor引用。
4.Akka Actors入門編程流程
下面簡述一下Akka Actors的入門級的編程流程:
- 編寫Message類,用於存放消息。
- 編寫Actor類,用於定義接收到消息之后所做的操作,注意日志處理。
- 定義一個ActorSystem,用於管理和調度程序中的Actor。
- 定義ActorRef,用於引用Actor類。
- 調用actorRef.tell(message),用於向此Actor發送消息。
- 關閉ActorSystem。
5.編程實例
下面以兩個簡單的例子來展示Actor並發編程的基本流程:
- Hello World
- 最快搜索:通過多個搜索引擎查詢某個條件,返回最快的查詢結果。
5.1.Hello World
場景說明:
- 定義一個消息類,用於存放被歡迎的內容。
- 定義一個角色類,用於歡迎接收到的消息。
實例代碼:
消息類:
/** * <p>Hello的消息類</p> * @author hanchao 2018/4/16 21:34 **/ public class HelloMessage{ /** 歡迎的對象 */ private String name; //getter setter constructor toString }
角色類:
/** * <p>Actor框架入門示例-HelloWorld的角色</p> * * @author hanchao 2018/4/16 21:07 **/ public class HelloActor extends UntypedAbstractActor { //定義日志,很重要 LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this); /** * <p>重寫接收方法</p> * @author hanchao 2018/4/16 21:31 **/ @Override public void onReceive(Object message){ log.info("HelloActor receive message : " + message); //如果消息類型是HelloMessage,則進行處理 if (message instanceof HelloMessage){ log.info("Hello " + ((HelloMessage) message).getName() + "!"); } } }
測試類:
/** * <p>Actor入門示例</p> * @author hanchao 2018/4/16 21:37 **/ public static void main(String[] args) { //創建actor系統 ActorSystem system = ActorSystem.create("hello-system"); //定義Actor引用 ActorRef helloActor = system.actorOf(Props.create(HelloActor.class),"hello-actor"); //向HelloActor發送消息 helloActor.tell(new HelloMessage("World"),null); helloActor.tell(new HelloMessage("Akka Actor"),null); //終止Actor系統 system.terminate(); }
運行結果:
[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='World'} [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello World! [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='Akka Actor'} [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello Akka Actor!
5.2.最快搜索
場景說明:
通過多個搜索引擎查詢某個條件,返回最快的查詢結果。
- 定義一個消息類,用於存放搜索條件。
- 定義一個消息類,用於存放搜索結果。
- 定義一個角色類,用於接收搜索條件、搜索和返回搜索結果。
- 定義一個角色類,用於派發搜索任務和接收搜索結果。
實例代碼:
搜索引擎工具類EngineUtils :模擬搜索引擎列表和搜索過程。
/** * 搜索引擎工具類 * Created by 韓超 on 2018/3/6. */ public class EngineUtils { private final static Logger LOGGER = Logger.getLogger(EngineUtils.class); //搜索引擎列表 private static List<String> engineList; static { engineList = new ArrayList<>(); engineList.add("百度"); engineList.add("Google"); engineList.add("必應"); engineList.add("搜狗"); engineList.add("Redis"); engineList.add("Solr"); } /** * <p>Title: 模擬一個搜索引擎進行一次問題查詢</p> * @author 韓超 2018/3/6 11:20 */ public static String searchByEngine(String question,String engine) throws InterruptedException { //獲取隨機的時間間隔 int interval = RandomUtils.nextInt(1,5000); // LOGGER.info("搜索引擎[" + engine + "]正在查詢,預計用時" + interval + "毫秒..."); //當前線程休眠指定時間,模擬搜索引擎用時 Thread.sleep(interval); return "通過搜索引擎[" + engine + "],首先查到關於(" + question + ")問題的結果,用時 = " + interval + "毫秒!"; } public static List<String> getEngineList() { return engineList; } public static void setEngineList(List<String> engineList) { EngineUtils.engineList = engineList; } }
搜索條件消息類QueryTerms ,用於存儲搜索條件:
/** * <p>Title: 定義查詢條件類,用於傳遞消息</p> * * @author 韓超 2018/3/6 16:16 */ static class QueryTerms { /** * 問題 */ private String question; /** * 搜索引擎 */ private String engine; //getter setter toString constructor }
搜索結果消息類QueryResult ,用於存放搜索結果:
/** * <p>Title: 定義查詢結果類,用於消息傳遞</p> * * @author 韓超 2018/3/6 16:17 */ static class QueryResult { /** * 查詢結果 */ private String result; //getter setter toString constructor }
搜索角色類SearchEngineAcotr ,用於接收搜索條件,調用搜索引擎進行搜索:
/** * <p>Title:搜索引擎Actor </br> * 繼承UntypedAbstractActor成為一個Actor</p> * * @author 韓超 2018/3/6 14:42 */ static class SearchEngineAcotr extends UntypedAbstractActor { //定義Actor日志 private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this); /** * <p>Title: Actor都需要重寫消息接收處理方法</p> * * @author 韓超 2018/3/6 14:42 */ @Override public void onReceive(Object message) throws Throwable { //如果消息是指定的類型Message,則進行處理,否則不處理 if (message instanceof QueryTerms) { log.info("接收到搜索條件:" + ((QueryTerms) message).getQuestion()); //通過工具類進行一次搜索引擎查詢 String result = EngineUtils.searchByEngine(((QueryTerms) message).getQuestion(), ((QueryTerms) message).getEngine()); //通過getSender().tell(result,actor)將actor的 處理結果[result] 發送消息的發送者[getSender()] //通過getSender獲取消息的發送方 //通過getSelf()獲取當前Actor getSender().tell(new QueryResult(result), getSelf()); } else { unhandled(message); } } }
主角色類QuestionQuerier ,用於分發搜索任務和接收搜索結果:
/** * <p>Title: 問題查詢器Actor</br> * 繼承自UntypedAbstractActor</p> * * @author 韓超 2018/3/6 16:31 */ static class QuestionQuerier extends UntypedAbstractActor { //定義Actor日志 private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this); /** * 搜索引擎列表 */ private List<String> engines; /** * 搜索結果 */ private AtomicReference<String> result; /** * 問題 */ private String question; public QuestionQuerier(String question, List<String> engines, AtomicReference<String> result) { this.question = question; this.engines = engines; this.result = result; } /** * <p>Title: Actor都需要重寫消息接收處理方法</p> * * @author 韓超 2018/3/6 16:35 */ @Override public void onReceive(Object message) throws Throwable { //如果收到查詢結果,則對查詢結果進行處理 if (message instanceof QueryResult) {//如果消息是指定的類型Result,則進行處理,否則不處理 log.info("接收到搜索結果:" + ((QueryResult) message).getResult()); //通過CAS設置原子引用的值 result.compareAndSet(null, ((QueryResult) message).getResult()); //如果已經查詢到了結果,則停止Actor //通過getContext()獲取ActorSystem的上下文環境 //通過getContext().stop(self())停止當前Actor getContext().stop(self()); } else {//如果沒有收到處理結果,則創建搜索引擎Actor進行查詢 log.info("開始創建搜索引擎進行查詢"); //使用原子變量去測試Actor的創建是否有序 AtomicInteger count = new AtomicInteger(1); //針對每一個搜索引擎,都創建一個Actor for (String engine : engines) { log.info("為" + engine + "創建第" + count + "個搜索引擎Actor...."); count.getAndIncrement(); //通過actorOf(Props,name)創建Actor //通過Props.create(Actor.class)創建Props ActorRef fetcher = this.getContext().actorOf(Props.create(SearchEngineAcotr.class), "fetcher-" + engine.hashCode()); //創建查詢條件 QueryTerms msg = new QueryTerms(question, engine); //將查詢條件告訴Actor fetcher.tell(msg, self()); } } } }
測試代碼:
/** * <p>Title:通過多個搜索引擎查詢多個條件,並返回第一條查詢結果 </p> * * @author 韓超 2018/3/6 14:15 */ public static void main(String[] args) { //通過工具類獲取搜索引擎列表 List<String> engines = EngineUtils.getEngineList(); //通過 Actor 進行並發查詢,獲取最先查到的答案 String result = new FlavorActorDemo().getFirstResult("今天你吃了嗎?", engines); //打印結果 } /** * 通過多個搜索引擎查詢,並返回第一條查詢結果 * * @param question 查詢問題 * @param engines 查詢條件數組 * @return 最先查出的結果 * @author 韓超 2018/3/6 16:44 */ @Override public String getFirstResult(String question, List<String> engines) { //創建一個Actor系統 ActorSystem system = ActorSystem.create("search-system"); //創建一個原子引用用於保存查詢結果 AtomicReference<String> result = new AtomicReference<>(); //通過靜態方法,調用Props的構造器,創建Props對象 Props props = Props.create(QuestionQuerier.class, question, engines, result); //通過system.actorOf(props,name)創建一個 問題查詢器Actor final ActorRef querier = system.actorOf(props, "master"); //告訴問題查詢器開始查詢 querier.tell(new Object(), ActorRef.noSender()); //通過while無限循環 等待actor進行查詢,知道產生結果 while (null == result.get()) ; //關閉 Actor系統 system.terminate(); //返回結果 return result.get(); }
運行結果(某一次):
[INFO] [04/18/2018 23:10:11.949] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 開始創建搜索引擎進行查詢 [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為百度創建第1個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Google創建第2個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為必應創建第3個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-6] [akka://search-system/user/master/fetcher-964584] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為搜狗創建第4個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Redis創建第5個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Solr創建第6個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master/fetcher-2138589785] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-2] [akka://search-system/user/master/fetcher-2582786] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master/fetcher-784239] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-3] [akka://search-system/user/master/fetcher-78837083] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.975] [search-system-akka.actor.default-dispatcher-4] [akka://search-system/user/master/fetcher-823867] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:13.255] [search-system-akka.actor.default-dispatcher-11] [akka://search-system/user/master] 接收到搜索結果:通過搜索引擎[搜狗],首先查到關於(今天你吃了嗎?)問題的結果,用時 = 1279毫秒! [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2582786#-286481483] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-964584#166806804] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2138589785#376820931] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-78837083#-224726121] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-784239#1622650486] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
參考文獻
[1] Akka系列(一):Akka簡介與Actor模型
[2] 漫談並發編程:Actor模型
[3] Java並發的四種風味:Thread、Executor、ForkJoin和Actor
