Spring與Akka的集成


概述

       近年來隨着Spark的火熱,Spark本身使用的開發語言Scala、用到的分布式內存文件系統Tachyon(現已更名為Alluxio)以及基於Actor並發編程模型的Akka都引起了大家的注意。了解過Akka或者Actor的人應該知道,這的確是一個很不錯的框架,按照Akka官網的描述——使用Akka使得構建強有力的並發與分布式應用將更加容易。由於歷史原因,很多Web系統在開發分布式服務時首先會選擇RMI(Remote Method Invoke ,遠程方法調用)、RPC(Remote Procedure Call Protocol,遠程過程調用)或者使用JMS(Java Messaging Service,Java消息服務)。

       但是使用RMI只能使用java語言,而且開發、執行效率都不高;RPC框架雖然可以通過匹配方法簽名的方式比RMI更靈活,但是其存在調用超時、調用丟失等缺點;JMS方式雖然可以通過At Least Delivery Once、消息持久化等機制保證消息不會丟失,但是只能作為一種跨服務的生產者、消費者編程模型使用。Akka不但處理了以上問題,而且還可以使用Actor作為並發編程模型,減少java多線程編程的阻塞、調度、上下文開銷甚至死鎖等問題。此外,Akka還提供了集群Sharding、流處理等功能的支持,更易於實現有限狀態自動機等功能。所以有心的開發者勢必會關心如何在最常見的Java系統中使用它,如何與Spring集成?

       本文參考Akka官方使用文檔,根據自身的經驗和理解,提供Akka與Spring集成的方案。本文不說明Spring框架的具體使用,並從Spring已經配置完備的情況開始敘述。

Actor系統——ActorSystem

       什么是ActorSystem?根據Akka官網的描述——ActorSystem是一個重量級的結構體,可以用於分配1到N個線程,所以每個應用都需要創建一個ActorSystem。通常而言,使用以下代碼來創建ActorSystem。

ActorSystem system = ActorSystem.create("Hello");

不過對於接入Spring而言,由IOC(Inversion of Control,控制反轉)方式會更接地氣,你可以這樣:

    <!-- AKKA System Setup -->
    <bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
        <constructor-arg value="helloAkkaSystem"/>
    </bean>

然后在你需要的地方依賴注入即可。

Actor編程模型

       有關Actor編程模型的具體介紹可以看我的另一篇博文——《Spark如何使用Akka實現進程、節點通信的簡明介紹》,里面有更多的介紹。需要補充的是,在最新的Scala官方網站上已經決定廢棄Scala自身的Actor編程模型,轉而全面擁抱Akka提供的Actor編程模型。

       我們可以通過以下代碼(代碼片段借用了Akka官網的例子)創建一個簡單的Actor例子。

       Greeter是代表問候者的Actor:

public class Greeter extends UntypedActor {

  public static enum Msg {
    GREET, DONE;
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Msg.GREET) {
      System.out.println("Hello World!");
      getSender().tell(Msg.DONE, getSelf());
    } else
      unhandled(msg);
  }

}

一般情況下我們的Actor都需要繼承自UntypedActor,並實現其onReceive方法。onReceive用於接收消息,你可以在其中實現對消息的匹配並做不同的處理。

HelloWorld是用於向Greeter發送問候消息的訪客:

public class HelloWorld extends UntypedActor {

  @Override
  public void preStart() {
    // create the greeter actor
    final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
    // tell it to perform the greeting
    greeter.tell(Greeter.Msg.GREET, getSelf());
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Greeter.Msg.DONE) {
      // when the greeter is done, stop this actor and with it the application
      getContext().stop(getSelf());
    } else
      unhandled(msg);
  }
}

有了Actor之后,我們可以這樣使用它:

ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");

       在HelloWorld的preStart實現中,獲取了Greeter的ActorRef(Actor的引用)並向Greeter發送了問候的消息,Greeter收到問候消息后,會先打印Hello World!,然后向HelloWorld回復完成的消息,HelloWorld得知Greeter完成了向世界問好這個偉大的任務后,就結束了自己的生命。HelloWorld的例子用編程API的方式告訴了我們如何使用Actor及發送、接收消息。為了便於描述與Spring的集成,下面再介紹一個例子。

       CountingActor(代碼主體借用自Akka官網)是用於計數的Actor,見代碼清單1所示。

代碼清單1

@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {

    public static class Count {
    }

    public static class Get {
    }

    // the service that will be automatically injected
    @Resource
    private CountingService countingService;

    private int count = 0;

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Count) {
            count = countingService.increment(count);
        } else if (message instanceof Get) {
            getSender().tell(count, getSelf());
        } else {
            unhandled(message);
        }
    }
}
CountingActor用於接收Count消息進行計數,接收Get消息回復給發送者當前的計數值。CountingService是用於計數的接口,其定義如下:
public interface CountingService {
    
    /**
     * 計數
     * @param count
     * @return
     */
    int increment(int count);

}
CountingService的具體實現是CountingServiceImpl,其實現如下:
@Service("countingService")
public class CountingServiceImpl implements CountingService {

    private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);

    /*
     * (non-Javadoc)
     * 
     * @see com.elong.sentosa.metadata.service.CountingService#increment(int)
     */
    @Override
    public int increment(int count) {
        logger.info("increase " + count + "by 1.");
        return count + 1;
    }

}
CountingActor通過注解方式注入了CountingService,CountingActor的計數實際是由CountingService完成。
        細心的同學可能發現了CountingActor使用了注解Named,這里為什么沒有使用@Service或者@Component等注解呢?由於Akka的Actor在初始化的時候必須使用System或者Context的工廠方法actorOf創建新的Actor實例,不能使用構造器來初始化,而使用Spring的Service或者Component注解,會導致使用構造器初始化Actor,所以會拋出以下異常:
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.

如果我們不能使用@Service或者@Component,也不能使用XML配置的方式使用(與注解一個道理),那么我們如何使用CountingActor提供的服務呢?

 

IndirectActorProducer接口

        IndirectActorProducer是Akka提供的Actor生成接口,從其名字我們知道Akka給我們指出了另一條道路——石頭大了繞着走!通過實現IndirectActorProducer接口我們可以定制一些Actor的生成方式,與Spring集成可以這樣實現它,見代碼清單2所示。

代碼清單2

public class SpringActorProducer implements IndirectActorProducer {
    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = args;
    }

    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName, args);
    }

    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}

SpringActorProducer的實現主要借鑒了Akka官方文檔,我這里對其作了一些擴展以便於支持構造器帶有多個參數的情況。從其實現看到實際是利用了ApplicationContext提供的getBean方式實例化Actor。
       這里還有兩個問題:一、ApplicationContext如何獲取和設置?二、如何使用SpringActorProducer生成Spring需要的Actor實例?

       對於第一個問題,我們可以通過封裝SpringActorProducer並實現ApplicationContextAware接口的方式獲取ApplicationContext;對於第二個問題,我們知道Akka中的所有Actor實例都是以Props作為配置參數開始的,這里以SpringActorProducer為代理生成我們需要的Actor的Props。

       SpringExt實現了以上思路,見代碼清單3所示。

代碼清單3

@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {

    private ApplicationContext applicationContext;

    /**
     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
     *
     * @param actorBeanName
     *            The name of the actor bean to create Props for
     * @return a Props that will create the named actor bean using Spring
     */
    public Props props(String actorBeanName, Object ... args) {
        return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

應用例子

        經過了以上的鋪墊,現在你可以使用創建好的CountingActor了,首先你需要在你的業務類中注入ActorSystem和SpringExt。

        @Autowired
    private ActorSystem actorSystem;

    @Autowired
    private SpringExt springExt;

然后我們使用CountingActor進行計數,代碼如下:

    ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");

    // Create the "actor-in-a-box"
        final Inbox inbox = Inbox.create(system);
        
    // tell it to count three times
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());

    // print the result
    FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
    Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
    try {
        System.out.println("Got back " + Await.result(result, duration));
    } catch (Exception e) {
        System.err.println("Failed getting result: " + e.getMessage());
        throw e;
    }

輸出結果為:

Got back 3

總結

       本文只是最簡單的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等機制都可以應用。

后記:經過近一年的准備,《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
 
售賣鏈接如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM