Akka的Actor模型及使用實例


本文的絕大部分內容轉載自rerun.me這一blog,老外寫的東西就是好啊。

ACTORS介紹

Anyone who has done multithreading in the past won't deny how hard and painful it is to manage multithreaded applications. I said manage because it starts out simple and it became a whole lot of fun once you start seeing performance improvements. However, it aches when you see that you don't have a easier way to recover from errors in your sub-tasks OR those zombie bugs that you find hard to reproduce OR when your profiler shows that your threads are spending a lot of time blocking wastefully before writing to a shared state.

I prefer not to talk about how Java concurrency API and their collections made it better and easier because I am sure if you are here, you probably needed more control over the sub-tasks or simply because you don't like to write locks and synchronized blocks and would prefer a higher level of abstraction.

In this series of Akka Notes, we would go through simple Akka examples to explore the various features that we have in the toolkit.

WHAT ARE ACTORS?

Treat Actors like People. People who don't talk to each other in person. They just talk through mails.

Let's expand on that a bit.

1.     MESSAGING

Consider two persons - A wise Teacher and Student. The Student sends a mail every morning to the Teacher and the wise Teacher sends a wise quote back.

Points to note :

  1. The student sends a mail. Once sent, the mail couldn't be edited. Talk about natural immutability.
  2. The Teacher checks his mailbox when he wishes to do so.
  3. The Teacher also sends a mail back (immutable again).
  4. The student checks the mailbox at his own time.
  5. The student doesn't wait for the reply. (no blocking)

That pretty much sums up the basic block of the Actor Model - passing messages.

 

2. CONCURRENCY

Now, imagine there are 3 wise teachers and 3 students - every student sends notes to every other teacher. What happens then? Nothing changes actually. Everybody has their own mailbox. One subtle point to note here is this :

By default, Mails in the mailbox are read/processed in the order they arrived.

Internally, by default it is a ConcurrentLinkedQueue. And since nobody waits for the mail to be picked up, it is simply a non-blocking message. (There are a variety of built-in mailboxes including bounded and priority based. In fact, we could build one ourself too)

 

3. FAILOVER

Imagine these 3 teachers are from three different departments - History, Geography and Philosophy.

History teachers replies with a note on an Event in the past, Geography teachers sends an Interesting Place and Philosophy teachers, a quote. Each student sends message to each teacher and gets responses. The student doesnt care which teacher in the department sends the reply back. What if one day, a teacher falls sick? There has to be at least one teacher handling the mails from the department. In this case, another teacher in the department steps up and does the job.

 

 

Points to note :

  1. There could be a pool of Actors who does different things.
  2. An Actor could do something that causes an exception. It wouldn't be able to recover by itself. In which case a new Actor could be created in place of the old one. Alternatively, the Actor could just ignore that one particular message and proceed with the rest of the messages. These are called Directives and we'll discuss them later.

4. MULTITASKING

For a twist, let's assume that each of these teachers also send the exam score through mail too, if the student asks for it. Similarly, an the Actor could handle more than one type of message comfortably.

5. CHAINING

What if the student would like to get only one final consolidated trivia mail instead of three?

We could do that too with Actors too. We could chain the teachers as a hierarchy. We'll come back to that later when we talk about Supervisors and revisit the same thought when we talk about Futures.

As requested by Mohan, let's just try to map the analogy components with the the components in the Actor Model.

 

Students and the Teachers becomes our Actors. The Email Inbox becomes the Mailbox component. The request and the response can't be modified. They are immutable objects. Finally, the MessageDispatcher component in Actor manages the mailbox and routes the messages to the respective Mailbox.

ACTOR 消息機制

From the introductory first part of the Akka Notes, we saw a bird's eye view of Actors in the Akka Toolkit. In this second part of the Akka Notes, we'll look at the messaging part of Actors. As for the example, we would use the same Student-Teacher example that we discussed earlier.

In this first part of Actor Messaging, we'll create the Teacher Actor and instead of the Student Actor, we'll use a main program called StudentSimulatorApp.

REVISITING STUDENT-TEACHER IN DETAIL

Let's for now consider the message sent by the StudentSimulatorApp to the TeacherActor alone. When I say StudentSimulatorApp, I just mean a normal main program.

The picture conveys this :

(if the terms are overwhelming, don't worry, we'll go through them in detail)

  1. Student creates something called an ActorSystem
  2. It uses the ActorSystem to create something called as ActorRef. The QuoteRequest message is sent to the ActorRef (a proxy to TeacherActor)
  3. Actor ref passes the message along to a Dispatcher
  4. The Dispatcher enqueues the message in the target Actor's MailBox.
  5. The Dispatcher then puts the Mailbox on a Thread (more on that in the next section).
  6. The MailBox dequeues a message and eventually delegates that to the actual Teacher Actor's receive method.

Like I said, don't worry about it. Let's look at each step in detail now. You can come back and revisit these five steps once we are done.

THE STUDENTSIMULATORAPP PROGRAM

We would use this StudentSimulatorApp to bring up the JVM and initialize the ActorSystem.

 

As we understand from the picture, the StudentSimulatorApp

  1. Creates an ActorSystem
  2. Uses the ActorSystem to create a proxy to the Teacher Actor (ActorRef)
  3. Sends the QuoteRequest message to the proxy.

Let's focus on these three points alone now.

  1. 1.       Creating an ActorSystem

ActorSystem is the entry point into the ActorWorld. ActorSystems are through which you could create and stop Actors. Or even shutdown the entire Actor environment.

On the other end of the spectrum, Actors are hierarchical and the ActorSystem is also similar to the java.lang.Object or scala.Any for all Actors - meaning, it is the root for all Actors. When you create an Actor using the ActorSystem's actorOf method, you create an Actor just below the ActorSystem.

 

The code for initializing the ActorSystem looks like

val system=ActorSystem("UniversityMessageSystem")

The UniversityMessageSystem is simply a cute name you give to your ActorSystem.

  1. 2.       Creating a Proxy for TeacherActor?

Let's consider the following snippet :

val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

The actorOf is the Actor creation method in ActorSystem. But, as you can see, it doesn't return a TeacherActor which we need. It returns something of type ActorRef.

The ActorRef acts as a Proxy for the actual Actors. The clients do not talk directly with the Actor. This is Actor Model's way of avoiding direct access to any custom/private methods or variables in the TeacherActor or any Actor for that sake.

To repeat, you send messages only to the ActorRef and it eventually reaches your actual Actor. You can NEVER talk to your Actor directly. People will hate you to death if you find some mean ways to do that.

 

  1. 3.       Send a QuoteRequest to the Proxy

It's an one liner again. You just tell the QuoteRequest message to the ActorRef. The tell method in Actor is actually !. (there's also a tell method in ActorRef which just delegates the call back to !)

         //send a message to the Teacher Actor
         teacherActorRef!QuoteRequest

That's it !!!

If you think I am lying, check the entire code of the StudentSimulatorApp below :

STUDENTSIMULATORAPP.SCALA

package me.rerun.akkanotes.messaging.actormsg1
import akka.actor.ActorSystem  
import akka.actor.Props  
import akka.actor.actorRef2Scala  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._
object StudentSimulatorApp extends App{
  //Initialize the ActorSystem
  val actorSystem=ActorSystem("UniversityMessageSystem") 
  //construct the Teacher Actor Ref
  val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])
  //send a message to the Teacher Actor
  teacherActorRef!QuoteRequest
  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep (2000) 
  //Shut down the ActorSystem.
  actorSystem.shutdown()
} 

Well, I cheated a little. You'll have to shutdown the ActorSystem or otherwise, the JVM keeps running forever. And I am making the main thread sleep for a little while just to give the TeacherActor to finish off its task. I know this sounds stupid. Don't worry about it. We'll write some neat testcases in the next part in order to avoid this hack.

THE MESSAGE

We just told a QuoteRequest to the ActorRef but we didn't see the message class at all !!

Here it comes :

(It is a recommended practice to wrap your messages in a nice object for easier organization)

TeacherProtocol

package me.rerun.akkanotes.messaging.protocols

object TeacherProtocol{

  case class QuoteRequest()

  case class QuoteResponse(quoteString:String)

}

As you know, the QuoteRequest is for the requests that come to the TeacherActor. The Actor would respond back with a QuoteResponse.

DISPATCHER AND A MAILBOX

The ActorRef delegates the message handling functionality to the Dispatcher. Under the hood, while we created the ActorSystem and the ActorRef, aDispatcher and a MailBox was created. Let's see what they are about.

 

MailBox

Ever Actor has one MailBox (we'll see one special case later). Per our analogy, every Teacher has one mailbox too. The Teacher has to check the mailbox and process the message. In Actor world, it's the other way round - the mailbox, when it gets a chance uses the Actor to accomplish its work.

Also the mailbox has a queue to store and process the messages in a FIFO fashion - a little different from our regular inbox where the most latest is the one at the top.

Now, the dispatcher

Dispatcher does some really cool stuff. From the looks of it, the Dispatcher just gets the message from the ActorRef and passes it on to the MailBox. But there's one amazing thing happening behind the scenes :

Dispatcher does some really cool stuff. From the looks of it, the Dispatcher just gets the message from the ActorRef and passes it on to the MailBox. But there's one amazing thing happening behind the scenes :

The Dispatcher wraps an ExecutorService (ForkJoinPool or ThreadPoolExecutor). It executes the MailBox against this ExecutorService.

Check out this snippet from the Dispatcher

protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = { 

    ...

    try {

        executorService execute mbox

    ...

}

示例代碼

這是一套用於計算熵增益的代碼,用在相似度檢測和內容推薦領域,主要思想是通過Master Actor派發計算任務給Worker Actor完成計算任務后,發消息通知Master完成計算結果的綜合工作,最后結果的展示由listener Actor完成。

EntropyMain

package com.emcc.recommand.entorpy;

import org.apache.log4j.Logger;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

import com.emcc.recommand.entorpy.actor.Listener;
import com.emcc.recommand.entorpy.actor.Master;
import com.emcc.recommand.entorpy.message.Calculate;

public class EntropyMain
{
    private static Logger logger = Logger.getLogger(EntropyMain.class);

    /**
     * 啟動的工作線程數
     */
    private static final int WORKER_COUNT = 10;

    public static void main(String[] args)
    {
        logger.info("begin to execute entorpy for recommand");

        calculate(WORKER_COUNT);

        logger.info("execute entorpy for recommand finished.");
    }

    /**
     * 工作線程
     */
    private static void calculate(int numOfWorkers)
    {
        // 創建Akka系統
        ActorSystem system = ActorSystem.create("Entropy-calc");

        // 結果監聽器,用於結果輸出
        ActorRef listener = system.actorOf(Props.create(Listener.class),
                "listener");

        // 啟動actor,驅動計算開始
        ActorRef master = system.actorOf(
                Props.create(Master.class, numOfWorkers, listener), "master");

        // 發送啟動消息
        master.tell(new Calculate(), master);
    }
}

Master

package com.emcc.recommand.entorpy.actor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.RoundRobinRouter;
import com.emcc.recommand.entorpy.message.Calculate;
import com.emcc.recommand.entorpy.message.Result;
import com.emcc.recommand.entorpy.message.SortedEntropy;
import com.emcc.recommand.entorpy.message.Work;
import com.emcc.recommand.entorpy.pojo.Entropy;
import com.emcc.recommand.entorpy.service.ProbabilityCalcService;
import com.emcc.recommand.entorpy.service.impl.ProbabilityCalcServiceImpl;

@SuppressWarnings("deprecation")
public class Master extends UntypedActor
{
    private static Logger logger = Logger.getLogger(Master.class);

    /**
     * 返回的結果數量,用於判斷計算是否完成
     */
    private int numOfResults;

    /**
     * 用於將結果傳遞到外部
     */
    private ActorRef listener;

    /**
     * 執行器
     */
    private ActorRef workerRouter;

    /**
     * 內容key
     */
    private List<String> contentKeys = null;

    /**
     * 特征key
     */
    private List<String> itemKeys = null;

    /**
     * 中間結果集合
     */
    private List<Entropy> middleResult = new ArrayList<Entropy>();

    /**
     * 計算開始時間
     */
    private final long start = System.currentTimeMillis();

    /**
     * 計算服務
     */
    private ProbabilityCalcService pcService = new ProbabilityCalcServiceImpl();

    /**
     * 
     * @param nrOfWorkers
     *            表示要啟動多少worker
     * @param nrOfElements
     *            分配給每個worker的分塊數量
     * @param listener
     */
    public Master(int nrOfWorkers, ActorRef listener)
    {
        this.contentKeys = pcService.getCntCodes();
        this.itemKeys = pcService.getItems();
        this.listener = listener;

        workerRouter = this.getContext().actorOf(
                Props.create(Worker.class).withRouter(
                        new RoundRobinRouter(nrOfWorkers)));

    }

    @Override
    public void onReceive(Object message) throws Exception
    {
        if (message instanceof Calculate)
        {
            logger.info("receive calculate messsage.");

            if (CollectionUtils.isNotEmpty(itemKeys)
                    && CollectionUtils.isNotEmpty(contentKeys))
            {
                long count = 0L;

                for (String item : itemKeys)
                {
                    count++;

                    logger.info("send message to worker, item=" + item + ", "
                            + count + "/" + itemKeys.size());

                    workerRouter.tell(new Work(item, contentKeys), getSelf());
                }
            }
            else
            {
                logger.error("stop process, no data to calculate.");

                // 停止整個系統
                getContext().system().shutdown();
            }
        }
        else if (message instanceof Result)
        {
            Result result = (Result) message;

            numOfResults += 1;

            logger.info("receive result messsage from worker, item="
                    + result.getItem() + ", " + numOfResults + "/"
                    + itemKeys.size());

            // 內存存入緩存
            middleResult.add(new Entropy(result.getItem(), result.getValue()));

            // 計算完畢
            if (numOfResults == itemKeys.size())
            {

                logger.info("calculate finished, begin to sort the result.");

                // 發送消息到listener
                Duration duration = Duration.create(System.currentTimeMillis()
                        - start, TimeUnit.MILLISECONDS);

                // 對輸出結果進行派訊
                Collections.sort(middleResult);

                logger.info("send SortedEntropy message to listener.");

                listener.tell(new SortedEntropy(middleResult, duration),
                        getSelf());

                // 停止master以及其控制的actor
                getContext().stop(getSelf());
            }
        }
        else
        {
            unhandled(message);
        }
    }
}

Worker

package com.emcc.recommand.entorpy.actor;

import java.math.BigDecimal;
import java.util.List;

import org.apache.log4j.Logger;

import akka.actor.UntypedActor;

import com.emcc.recommand.entorpy.message.Result;
import com.emcc.recommand.entorpy.message.Work;
import com.emcc.recommand.entorpy.service.ProbabilityCalcService;
import com.emcc.recommand.entorpy.service.impl.ProbabilityCalcServiceImpl;
import com.emcc.recommand.entorpy.util.Logarithm;

/**
 * 計算某個item的熵增益
 */
public class Worker extends UntypedActor
{
    private static Logger logger = Logger.getLogger(Worker.class);

    /**
     * 計算結果保留位數
     */
    private static final int DECIMAL_SIZE = 4;

    private ProbabilityCalcService pcService = new ProbabilityCalcServiceImpl();

    @Override
    public void onReceive(Object message) throws Exception
    {
        if (message instanceof Work)
        {
            Work work = (Work) message;

            logger.info("receive work message from master, item="
                    + work.getItem());

            double result = calculateEntropyForItem(work.getItem(),
                    work.getCntKeyList());

            logger.info("send result message to master, item=" + work.getItem()
                    + ", value=" + result);

            getSender().tell(new Result(result, work.getItem()), getSelf());
        }
        else
        {
            unhandled(message);
        }
    }

    /**
     * 計算對某個item的熵增益
     * 
     * @param item
     *            要計算的短語名稱
     * @param cntKeys
     *            系統內全部的cnt
     * @return 熵增益
     */
    private double calculateEntropyForItem(String item, List<String> cntCodes)
    {
        Double entropyIncr = 0.0;

        // 中間計算邏輯,計算出對某個item的熵增益並返回
        if (cntCodes != null && !cntCodes.isEmpty())
        {
            Double x = pcService.calcCntProbability();
            Double y = pcService.calcDocProbability(item);

            for (String cntCode : cntCodes)
            {
                Double z = pcService.calcCndProbability(cntCode, item);

                if (isParamLegal(x, y, z))
                {
                    Double temp = calculateByFormular(x, y, z);

                    if (temp != null && !temp.isInfinite() && !temp.isNaN())
                    {
                        entropyIncr = entropyIncr + temp;
                    }
                    else
                    {
                        logger.error("calculate ectropy error, item=" + item
                                + ", cntCode=" + cntCode + ", x=" + x + ",y="
                                + y + ", z=" + z);
                    }
                }
            }
        }

        return format(entropyIncr, DECIMAL_SIZE);
    }

    /**
     *    計算值 
     */
    private double calculateByFormular(Double x, Double y, Double z)
    {
        Double a1 = -x * Logarithm.log2(x);
        Double a2 = z * x * Logarithm.log2((z * x) / y);
        Double a3 = (1 - z) * x * Logarithm.log2(((1 - z) * x) / (1 - y));
        
        return a1 + a2 + a3;
    }

    private boolean isParamLegal(Double x, Double y, Double z)
    {
        boolean res = false;

        if (x != null && y != null && z != null)
        {
            res = true;
        }
        else
        {
            logger.error("parameter error, cancel this round, x=" + x + ", y="
                    + y + ", z=" + z);
        }

        return res;
    }

    /**
     * 計算結果完整的返回用於調整算法
     */
    private Double format(Double org, int decimalSize)
    {
        try
        {
            BigDecimal b = new BigDecimal(org);

            Double f = b.setScale(decimalSize, BigDecimal.ROUND_HALF_UP)
                    .doubleValue();

            return f;
        }
        catch (Exception e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return 0.0;
    }
}

Listener

package com.emcc.recommand.entorpy.actor;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.log4j.Logger;

import akka.actor.UntypedActor;

import com.emcc.recommand.entorpy.message.SortedEntropy;
import com.emcc.recommand.entorpy.util.FileUtil;

/**
 * 結果輸出actor
 */
public class Listener extends UntypedActor
{
    private static Logger logger = Logger.getLogger(Listener.class);

    @Override
    public void onReceive(Object message) throws Exception
    {
        if (message instanceof SortedEntropy)
        {
            SortedEntropy sortedEntropy = (SortedEntropy) message;

            // 結果輸出至文件系統
            FileUtil.list2File(getOutputPath(), sortedEntropy.getEntorpyList());

            logger.info("output resultfile finished, takes "
                    + sortedEntropy.getDuration().toSeconds() + "s.");

            getContext().system().shutdown();
        }
        else
        {
            unhandled(message);
        }
    }

    private String getOutputPath()
    {
        DateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");

        String date = df.format(new Date());

        return "./entropy_" + date + ".out";
    }
}

使用總結

Akka確實可以簡化多線程編程,開發人員可以從繁復的多線程開發中解脫出來,更加專注於業務實現,這種抽象適合絕大多數應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM