本博客介紹一種AOP、無侵入的akka監控方案,方便大家在生產使用akka的過程中對akka進行監控。
對於自身javaer來說,AOP三個字母基本就解釋清楚了akka監控框架的原理。哈哈哈,不過我這里還是啰嗦一點,把相關的方案和框架介紹一下。
無侵入監控方案,不管是對akka還是其他java應用來說,經常聽到動態代理、靜態代理、AOP這些詞匯。AOP為Aspect Oriented Programming的縮寫,意為:面向切面編程。使用Spring的童鞋一定聽過這個詞。雖然AOP一般用在spring上,但用在我們的akka監控來說也是非常合適的。靜態代理、動態代理只不過是AOP的一種實現方式,就是用統一的代理類來攔截特定對象的執行過程,並加以監控。但這有一個比較嚴重的缺陷,至少在akka的監控中是這樣的,那就是代理類會影響目標類的類型,比如通過classOf獲取到的是代理類的類型。字節碼注入是另一種AOP的實現方式,它是通過編譯期或加載期修改目標類的字節碼,來進行監控的。簡單來說就是修改了Java的class文件,從而在不修改源碼的情況下,修改編譯結果。字節碼注入有很多框架,但我們這里選擇了ASPECTJ,這個工具可以通過注解的形式定義注入點,非常方便。
好了,基本的知識點就講解清楚了,下面直接上代碼及其框架。

監控框架工程命名為akkamon,意為akka的監控,哈哈。分為四個模塊:
- akkamon-core。核心模塊,定義基本類型,主要用來定義注入點函數,就是aspectJ中的pointCut。
- akkamon-injector。注入器。定義並實現ActorSystem的一個AkkaInjectorExtension,將注入點攔截到的信息以消息的形式發送給特定的actor
- akkamon-java-shell。pointCut實現模塊。因為aspectj-maven-plugin對scala支持不太友好,此處用Java實現對pointCut的具體實現。
- akkamon-target。測試模塊
下面我們分模塊詳細講解各個類的定義及其意義。首先是akkamon-core
trait Instrumentation
Instrumentation意為儀器、儀表,代表一組注入函數集合。
trait ActorCellInstrumentation extends Instrumentation {
def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef ):Unit
def beforeActorTerminate(cell:ActorCell):Unit
}
比如ActorCellInstrumentation代表ActorCell的監控儀表,它定義了兩個函數,分別是ActorCell創建前和銷毀前。
trait ActorSystemInjectPoint extends ListenerInstrumentation with ActorCellInstrumentation with ActorSystemInstrumentation with MessageInstrumentation
ActorSystemInjectPoint代表ActorSystem的所有注入點,它主要是所有Instrumentation的一個匯總。
class ActorSystemInjectPointImpl extends ActorSystemInjectPoint
ActorSystemInjectPoint有一個實現類,它來實現ActorSystemInjectPoint,主要是把攔截到的消息通知給listener,我們來看看其中一個函數的實現。
override def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {
val message = notifyMessage(ActorCellCreation(system.name,ref,props,parent))
log.debug(s"beforeActorCellCreation $message")
}
很明顯在攔截到ActorCell創建的時候,會構建ActorCellCreation這個case class,然后通過notifyMessage函數把消息通知給listener。

notifyMessage是ActorSystemInstrumentationListeners的一個方法,ActorSystemInstrumentationListeners維護了ActorSystemInstrumentationListener列表,在notifyMessae內部,依次調用所有listener的onMessage函數。
abstract class ActorSystemInstrumentationListener{
def onMessage(message:InjectMessage):Unit
def close():Unit
}
那么到這里,可能會有讀者問,ActorSystemInstrumentationListener是什么時候注冊到ActorSystemInstrumentationListeners里面的呢?答案就是監控ActorSystemInstrumentationListener構造函數的調用,然后把它register到listeners列表的。那Instrumentation定義的那些函數又是啥時候調用的呢?答案就在akkamon-java-shell模塊。
@Aspect public class InstrumentationJavaShell implements ActorSystemInjectPoint
這個模塊只有一個InstrumentationJavaShell類,它繼承了ActorSystemInjectPoint,所以它會實現你所有定義的注入點,並把相應的函數調用轉發給ActorSystemInjectPointImpl
private static ActorSystemInjectPointImpl actorSystemInstrumentation = new ActorSystemInjectPointImpl();
ActorSystemInjectPointImpl是InstrumentationJavaShell里面的一個靜態類。
@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, *, parent)")
public void actorCellCreation(ActorSystem system , ActorRef ref,Props props , ActorRef parent){}
@Override
@Before("actorCellCreation(system,ref,props,parent)")
public void beforeActorCellCreation(ActorSystem system , ActorRef ref, Props props , ActorRef parent) {
actorSystemInstrumentation.beforeActorCellCreation(system,ref,props,parent);
}
上面兩個函數分別定義了一個Pointcut和一個advice,Pointcut是對akka.actor.ActorCell.new的注入,Before這個advice是在akka.actor.ActorCell.new執行之前調用,可以看到beforeActorCellCreation就是簡單的把相關的參數傳給actorSystemInstrumentation.beforeActorCellCreation,而beforeActorCellCreation里面是通知各個listener。
下面是listener的注冊過程。
@Pointcut("execution(com.gabry.akkamon.listener.ActorSystemInstrumentationListener.new(..)) && this(listener)")
public void listenerCreation(ActorSystemInstrumentationListener listener) {
}
@Override
@After("listenerCreation(listener)")
public void afterListenerCreation(ActorSystemInstrumentationListener listener) {
actorSystemInstrumentation.afterListenerCreation(listener);
}
listener的注冊過程其實也是通過AOP來實現的。
class ActorListener(actorRef:ActorRef) extends ActorSystemInstrumentationListener{
override def onMessage(message: InjectMessage): Unit = {
actorRef ! message
}
override def close(): Unit = {
}
}
上面是一個listener的實現,其實就是簡單的把消息通知給了相關的actor。
至此,一個akka的監控框架就實現了,是不是很簡單呢?框架很簡單,麻煩的是Instrumentation的定義和實現啊。

讀者問怎么執行?那就是java命令行的-javaagent參數啊。
