Akka 和 μJavaActors入門


     AkkaμJavaActorsμJavaActors均是java的Actor庫,其中Akka提供了叫為完整的Actor開發框架,較為龐大,學習成本很高,μJavaActors 是一個輕量級Actor庫,大約僅有1200行代碼,比較適合入門。

一.Akka Demo 

Akka是一個相當成熟、強大的庫,github上download下的是Akka的源碼,應該使用sbt構建的工程,如果沒有使用sbt經驗,想導出jar還挺不容易的,推薦Akka官網下載akka各個組件的jar去使用,簡單介紹一下helloworld 級別Akka的demo。

1.Akka的主要組件

akka-actor.jar : Actor核心組件了,定義了Acotr核心類

akka-slf4f.jar : SLF4F Logger的支持,一個打log的組件,不用太關注

akka-remote.jar : Actor做遠程調用的jar,類似RFC吧

akka-cluster : actor做集群管理組件

akka-camel.jar : 對Apache Camel 集成接口

scala-library-2.11.8.jar : akka核心應該是Scala寫的,這個組件就是對akka的核心支持

Akka還有很多組件,不過對於hello world級的程序簡單了解幾個就ok了。工程是基於eclipse的,需要包含下面幾個基礎的組件:

4YQ$GDJYW$F]Q~]K`XCL@YU

編寫兩個Actor:

package demo02;

import akka.actor.UntypedActor;
/*
 * UntypedAcotr是無類型Actor的一個抽象類,繼承與核心類Actor
 */
public class Greeter extends UntypedActor {

    public static enum Msg{
        GREET , DONE;
    }
    /**
     * 每個Actor必須實現OnReceive,當該Actor收到消息調用該方法
     */
    @Override
    public void onReceive(Object msg) throws Throwable {
        if(msg == Msg.GREET){
            System.out.println("Hello world");
            /**
             * 這里吐槽一下Akka對於發消息的設計,發送消息的設計竟然是:
             * receiver.tell(msg , sender)
             * 也許沒理解akka設計的理念,但是正常人設計不應該是:
             *  sender.tell(msg , receiver)
             *  汗……
             */
            getSender().tell(Msg.DONE, getSelf());
        }else{
            unhandled(msg);
        }
        
    }

}
package demo02;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart(){
        final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class));
        greeter.tell(Greeter.Msg.GREET, getSelf());
    }
    
    
    @Override
    public void onReceive(Object msg) throws Throwable {
    
        if(msg == Greeter.Msg.DONE){
            getContext().stop(getSelf());
        }else{
            unhandled(msg);
        }
        
    }

    
}

下面是Main方法:

package demo02;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Main {

    public static void main(String[] args) {
        //ActorSystem 相當於ActorManager,管理各種Acor調度、線程管理等
        ActorSystem system = ActorSystem.create("hello");
        //創建一個HelloWorld 類型的Actor,在Actor啟動前,會調preStart(),此時會想Greeter發消息
        ActorRef actor = system.actorOf(Props.create(HelloWorld.class));
        //添加結束終結Actor,當ActorSystem調Stop時,會向每個Actor發送Terminated消息    
        system.actorOf(Props.create(Terminator.class, actor), "terminator");
        
    }
    public static class Terminator extends UntypedActor{

        private final LoggingAdapter log = Logging.getLogger(getContext().system(),this);
        private ActorRef actorRef = null;
        
        public Terminator(ActorRef ref){
            System.out.println("Terminator Init !!!");
            actorRef = ref;
            getContext().watch(actorRef);
        }
        
        @Override
        public void onReceive(Object msg) throws Throwable {
              if (msg instanceof Terminated) {
                  log.info("{} has terminated, shutting down system", actorRef.path());
                  getContext().system().terminate();
                } else {
                  unhandled(msg);
                }
            
        }
        
    }
    
}

上面代碼在akka的源碼中sample都可以找到的,從上面看Akka對消息的識別是根據類型處理的,在我這種菜鳥看來,並不是很合適,當我消息類型較多時,消息類豈不是要爆炸,當然也可以做分級Actor,再加一層轉發層解決這個問題哈

二.μJavaActors

    μJavaActors 是一個十分輕量級的Actor庫,實現核心的Actor調度,不涉及復雜的框架,簡單分析一下它的源碼吧

1.Actor核心接口

Actor:定義了一個標准的Actor應該具有行為

ActorManager:Actor管理器接口,提供線程管理,Actor調度等

Messager : Actor相互間傳遞傳遞的消息接口,當然附帶的接口還有MessageEvent和MessageListener

簡單引用作者對這個概念的描述:

Actor 是一個執行單元,一次處理一條消息。Actor 具有以下關鍵行為或特征:

  • 每個 actor 有一個 name,該名稱在每個 ActorManager 中必須是惟一的。
  • 每個 actor 屬於一個 category;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬於一個類別。
  • 只要 ActorManager 可以提供一個執行 actor 的線程,系統就會調用 receive()。為了保持最高效率,actor 應該迅速處理消息,而不要進入漫長的等待狀態(比如等待人為輸入)。
  • willReceive() 允許 actor 過濾潛在的消息主題。
  • peek() 允許該 actor 和其他 actor 查看是否存在掛起的消息(或許是為了選擇主題)。
  • remove() 允許該 actor 和其他 actor 刪除或取消任何尚未處理的消息。
  • getMessageCount() 允許該 actor 和其他 actor 獲取掛起的消息數量。
  • getMaxMessageCount() 允許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。

大部分程序都有許多 actor,這些 actor 常常具有不同的類型。actor 可在程序啟動時創建或在程序執行時創建(和銷毀)。本文中的 actor 包 包含一個名為 AbstractActor 的抽象類,actor 實現基於該類。

 

ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager 擁有以下關鍵行為或特征:

  • createActor() 創建一個 actor 並將它與此管理器相關聯。
  • startActor() 啟動一個 actor。
  • detachActor() 停止一個 actor 並將它與此管理器斷開。
  • send()/broadcast() 將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或所有 actor。

在大部分程序中,只有一個 ActorManager,但如果您希望管理多個線程和/或 actor 池,也可以有多個 ActorManager。此接口的默認實現是 DefaultActorManager

 

消息 是在 actor 之間發送的消息。Message 是 3 個(可選的)值和一些行為的容器:

  • source 是發送 actor。
  • subject 是定義消息含義的字符串(也稱為命令)。
  • data 是消息的任何參數數據;通常是一個映射、列表或數組。參數可以是要處理和/或其他 actor 要與之交互的數據。
  • subjectMatches() 檢查消息主題是否與字符串或正則表達式匹配。

μJavaActors 包的默認消息類是 DefaultMessage

 

ActorManager其實只要簡單瀏覽一下μJavaActors源碼就可以理解Actor設計思路啦,主要分析一下ActorManager中的Actor調度源碼:

public class ActorRunnable implements Runnable {
        public boolean hasThread;
        public AbstractActor actor;

        public void run() {
            // logger.trace("procesNextActor starting");
            int delay = 1;
            while (running) {
                try {
                    if (!procesNextActor()) {
                        // logger.trace("procesNextActor waiting on actor");
                        // sleep(delay * 1000);
                        synchronized (actors) {
                            // TOOD: adjust this delay; possible parameter
                            // we want to minizmize overhead (make bigger);
                            // but it has a big impact on message processing
                            // rate (makesmaller)
                            // actors.wait(delay * 1000);
                            actors.wait(100);
                        }
                        delay = Math.max(5, delay + 1);
                    } else {
                        delay = 1;
                    }
                } catch (InterruptedException e) {
                } catch (Exception e) {
                    logger.error("procesNextActor exception", e);
                }
            }
            // logger.trace("procesNextActor ended");
        }

        protected boolean procesNextActor() {
            boolean run = false, wait = false, res = false;
            actor = null;
            synchronized (actors) {
                for (String key : runnables.keySet()) {
                    actor = runnables.remove(key);
                    break;
                }
            }
            if (actor != null) {
                // first run never started
                run = true;
                actor.setHasThread(true);
                hasThread = true;
                try {
                    actor.run();
                } finally {
                    actor.setHasThread(false);
                    hasThread = false;
                }
            } else {
                synchronized (actors) {
                    for (String key : waiters.keySet()) {
                        actor = waiters.remove(key);
                        break;
                    }
                }
                if (actor != null) {
                    // then waiting for responses
                    wait = true;
                    actor.setHasThread(true);
                    hasThread = true;
                    try {
                        res = actor.receive();
                        if (res) {
                            incDispatchCount();
                        }
                    } finally {
                        actor.setHasThread(false);
                        hasThread = false;
                    }
                }
            }
            // if (!(!run && wait && !res) && a != null) {
            // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a);
            // }
            return run || res;
        }
    }
ActorMgr中有一個線程隊列維護了一些ActorRunnable對象,每個ActorRunnable對象有都在無線循環調度Actor,這也就簡單使得每個Actor在不同的線程中執行。當然此時會有個問題,如果有一些Actor出現資源競爭會不會出現問題,答案肯定是會的。Actor僅僅是抽象了線程調度問題並給出了一下Actor的原則,並不能完全避免資源競爭現象的出現,只能說准守Actor模式規范,,當然也可以用redis去做公共內存塊,避免直接的全局資源讀寫。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM