akka監控框架設計


  本博客介紹一種AOP、無侵入的akka監控方案,方便大家在生產使用akka的過程中對akka進行監控。

  對於自身javaer來說,AOP三個字母基本就解釋清楚了akka監控框架的原理。哈哈哈,不過我這里還是啰嗦一點,把相關的方案和框架介紹一下。

  無侵入監控方案,不管是對akka還是其他java應用來說,經常聽到動態代理、靜態代理、AOP這些詞匯。AOP為Aspect Oriented Programming的縮寫,意為:面向切面編程。使用Spring的童鞋一定聽過這個詞。雖然AOP一般用在spring上,但用在我們的akka監控來說也是非常合適的。靜態代理、動態代理只不過是AOP的一種實現方式,就是用統一的代理類來攔截特定對象的執行過程,並加以監控。但這有一個比較嚴重的缺陷,至少在akka的監控中是這樣的,那就是代理類會影響目標類的類型,比如通過classOf獲取到的是代理類的類型。字節碼注入是另一種AOP的實現方式,它是通過編譯期或加載期修改目標類的字節碼,來進行監控的。簡單來說就是修改了Java的class文件,從而在不修改源碼的情況下,修改編譯結果。字節碼注入有很多框架,但我們這里選擇了ASPECTJ,這個工具可以通過注解的形式定義注入點,非常方便。

  好了,基本的知識點就講解清楚了,下面直接上代碼及其框架。

  監控框架工程命名為akkamon,意為akka的監控,哈哈。分為四個模塊:

  • akkamon-core。核心模塊,定義基本類型,主要用來定義注入點函數,就是aspectJ中的pointCut。
  • akkamon-injector。注入器。定義並實現ActorSystem的一個AkkaInjectorExtension,將注入點攔截到的信息以消息的形式發送給特定的actor
  • akkamon-java-shell。pointCut實現模塊。因為aspectj-maven-plugin對scala支持不太友好,此處用Java實現對pointCut的具體實現。
  • akkamon-target。測試模塊

  下面我們分模塊詳細講解各個類的定義及其意義。首先是akkamon-core

trait Instrumentation

   Instrumentation意為儀器、儀表,代表一組注入函數集合。

trait ActorCellInstrumentation extends Instrumentation {
  def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef ):Unit
  def beforeActorTerminate(cell:ActorCell):Unit
}

   比如ActorCellInstrumentation代表ActorCell的監控儀表,它定義了兩個函數,分別是ActorCell創建前和銷毀前。

trait ActorSystemInjectPoint extends ListenerInstrumentation
  with ActorCellInstrumentation
  with ActorSystemInstrumentation
  with MessageInstrumentation

   ActorSystemInjectPoint代表ActorSystem的所有注入點,它主要是所有Instrumentation的一個匯總。

class ActorSystemInjectPointImpl extends ActorSystemInjectPoint

   ActorSystemInjectPoint有一個實現類,它來實現ActorSystemInjectPoint,主要是把攔截到的消息通知給listener,我們來看看其中一個函數的實現。

override def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {
    val message = notifyMessage(ActorCellCreation(system.name,ref,props,parent))
    log.debug(s"beforeActorCellCreation $message")
  }

   很明顯在攔截到ActorCell創建的時候,會構建ActorCellCreation這個case class,然后通過notifyMessage函數把消息通知給listener。

  notifyMessage是ActorSystemInstrumentationListeners的一個方法,ActorSystemInstrumentationListeners維護了ActorSystemInstrumentationListener列表,在notifyMessae內部,依次調用所有listener的onMessage函數。

abstract class ActorSystemInstrumentationListener{
  def onMessage(message:InjectMessage):Unit
  def close():Unit
}

   那么到這里,可能會有讀者問,ActorSystemInstrumentationListener是什么時候注冊到ActorSystemInstrumentationListeners里面的呢?答案就是監控ActorSystemInstrumentationListener構造函數的調用,然后把它register到listeners列表的。那Instrumentation定義的那些函數又是啥時候調用的呢?答案就在akkamon-java-shell模塊。

@Aspect
public class InstrumentationJavaShell implements ActorSystemInjectPoint 

   這個模塊只有一個InstrumentationJavaShell類,它繼承了ActorSystemInjectPoint,所以它會實現你所有定義的注入點,並把相應的函數調用轉發給ActorSystemInjectPointImpl

private static ActorSystemInjectPointImpl actorSystemInstrumentation = new ActorSystemInjectPointImpl();

   ActorSystemInjectPointImpl是InstrumentationJavaShell里面的一個靜態類。

@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, *, parent)")
    public void actorCellCreation(ActorSystem system , ActorRef ref,Props props , ActorRef parent){}

    @Override
    @Before("actorCellCreation(system,ref,props,parent)")
    public void beforeActorCellCreation(ActorSystem system , ActorRef ref, Props props , ActorRef parent) {
        actorSystemInstrumentation.beforeActorCellCreation(system,ref,props,parent);
    }

   上面兩個函數分別定義了一個Pointcut和一個advice,Pointcut是對akka.actor.ActorCell.new的注入,Before這個advice是在akka.actor.ActorCell.new執行之前調用,可以看到beforeActorCellCreation就是簡單的把相關的參數傳給actorSystemInstrumentation.beforeActorCellCreation,而beforeActorCellCreation里面是通知各個listener。

  下面是listener的注冊過程。

@Pointcut("execution(com.gabry.akkamon.listener.ActorSystemInstrumentationListener.new(..)) && this(listener)")
    public void listenerCreation(ActorSystemInstrumentationListener listener) {

    }

    @Override
    @After("listenerCreation(listener)")
    public void afterListenerCreation(ActorSystemInstrumentationListener listener) {
        actorSystemInstrumentation.afterListenerCreation(listener);
    }

   listener的注冊過程其實也是通過AOP來實現的。

class ActorListener(actorRef:ActorRef) extends ActorSystemInstrumentationListener{
  override def onMessage(message: InjectMessage): Unit = {
    actorRef ! message
  }

  override def close(): Unit = {

  }
}

   上面是一個listener的實現,其實就是簡單的把消息通知給了相關的actor。

  至此,一個akka的監控框架就實現了,是不是很簡單呢?框架很簡單,麻煩的是Instrumentation的定義和實現啊。

  讀者問怎么執行?那就是java命令行的-javaagent參數啊。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM