[comment]: # Spark集群 + Akka + Kafka + Scala 開發(3) : 開發一個Akka + Spark的應用
前言
在Spark集群 + Akka + Kafka + Scala 開發(1) : 配置開發環境中,我們已經部署好了一個Spark的開發環境。
在Spark集群 + Akka + Kafka + Scala 開發(2) : 開發一個Spark應用中,我們已經寫好了一個Spark的應用。
本文的目標是寫一個基於akka的scala工程,在一個spark standalone的集群環境中運行。
akka是什么?
akka的作用
akka的名字是action kernel的回文。根據官方定義:akka用於resilient elastic distributed real-time transaction processing。
個人理解是:
resilient:是指對需求和安全性等方面(來自於外部的)的一種適應力(彈性)。
elastic:是指對資源利用方面的彈性。
因此,akka是一個滿足需求彈性、資源分配彈性的分布式實時事務處理系統。
akka只是一個類庫,一個工具,並沒有提供一個平台。
akka的運行模式和用例
- akka有兩種運行模式:
- As a library: 一個使用於web應用,把akka作為一個普通的jar包放到classpath或者
WEB-INF/lib
。 - As an application: 也稱為micro system。
- As a library: 一個使用於web應用,把akka作為一個普通的jar包放到classpath或者
- akka的用例
akka的用例很多,可以參照Examples of use-cases for Akka.
本文中的用例
在本文中,一個Spark + akka的環境里,akka被用於as an application
模式下。
我們會創建一個akka工程,含有兩個應用:
- akka host application
建立一個actor system, 定義了所有的任務(actors)。等待客戶端的請求。
部分actor使用了spark的雲計算功能。
這是一個spark的應用。 - akka client application
調用host application上特定的actor。
我們看出,這里我們把akka作為一個任務處理器,並通過spark來完成任務。
項目結構和文件說明
說明
這個工程包含了兩個應用。
一個Consumer應用:CusomerApp:實現了通過Spark的Stream+Kafka的技術來實現處理消息的功能。
一個Producer應用:ProducerApp:實現了向Kafka集群發消息的功能。
文件結構
AkkaSampleApp # 項目目錄
|-- build.bat # build文件
|-- src
|-- main
|-- resources
|-- application.conf # Akka Server應用的配置文件
|-- client.conf # Akka Client應用的配置文件
|-- scala
|-- ClientActor.scala # Akka Client的Actor:提供了一種調用Server Actor的方式。
|-- ClientApp.scala # Akka Client應用
|-- ProductionReaper.scala # Akka Shutdown pattern的實現者
|-- Reaper.scala # Akka Shutdown pattern的Reaper抽象類
|-- ServerActor.scala # Akka Server的Actor,提供一個求1到n的MapReduce計算。使用了Spark。
|-- ServerApp.scala # Akka Server應用
構建工程目錄
可以運行:
mkdir AkkaSampleApp
mkdir -p /AkkaSampleApp/src/main/resources
mkdir -p /AkkaSampleApp/src/main/scala
代碼
build.sbt
name := "akka-sample-app"
version := "1.0"
scalaVersion := "2.11.8"
scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.10",
"com.typesafe.akka" %% "akka-remote" % "2.4.10",
"org.apache.spark" %% "spark-core" % "2.0.0"
)
resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
application.conf
akka {
#loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
#log-sent-messages = on
#log-received-messages = on
}
}
cient.conf
akka {
#loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
#log-sent-messages = on
#log-received-messages = on
}
}
注:
port = 0
表示這個端口號會自動生成一個。
ClientActor.scala
import akka.actor._
import akka.event.Logging
class ClientActor(serverPath: String) extends Actor {
val log = Logging(context.system, this)
val serverActor = context.actorSelection(serverPath)
def receive = {
case msg: String =>
log.info(s"ClientActor received message '$msg'")
serverActor ! 10000L
}
}
ClientApp.scala
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.RemoteScope
import akka.util._
import java.util.concurrent.TimeUnit
import scala.concurrent._
import scala.concurrent.duration._
object ClientApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
// get the remote actor via the server actor system's address
val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))
// invoke the remote actor via a client actor.
// val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
// val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")
buildReaper(system, actor)
// tell
actor ! 10000L
waitShutdown(system, actor)
}
private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
import Reaper._
val reaper = system.actorOf(Props(classOf[ProductionReaper]))
// Watch the action
reaper ! WatchMe(actor)
}
private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
// trigger the shutdown operation in ProductionReaper
system.stop(actor)
// wait to shutdown
Await.result(system.whenTerminated, 60.seconds)
}
}
ProductionReaper.scala
當所有的Actor停止后,終止Actor System。
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = {
context.system.terminate()
}
}
Reaper.scala
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
ServerActor.scala
提供一個求1到n平方和的MapReduce計算。
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
class ServerActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case n: Long =>
squareSum(n)
}
private def squareSum(n: Long): Long = {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val squareSum = sc.parallelize(1L until n).map { i =>
i * i
}.reduce(_ + _)
log.info(s"============== The square sum of $n is $squareSum. ==============")
squareSum
}
}
ServerApp.scala
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
object ServerApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("ServerActorSystem")
val actor = system.actorOf(Props[ServerActor], name = "serverActor")
}
}
構建工程
進入目錄AkkaSampleApp。運行:
sbt package
第一次運行時間會比較長。
測試應用
啟動Spark服務
- 啟動spark集群master server
$SPARK_HOME/sbin/start-master.sh
master服務,默認會使用
7077
這個端口。可以通過其日志文件查看實際的端口號。
- 啟動spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
啟動Akka Server應用
運行:
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
如果出現java.lang.NoClassDefFoundError錯誤,
請參照Spark集群 + Akka + Kafka + Scala 開發(1) : 配置開發環境,
確保akka的包在Spark中設置好了。
注:可以使用Ctrl+C來中斷這個Server應用。
啟動Akka Client應用
新啟動一個終端,運行:
java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp
# or
# $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
然后:看看Server應用是否開始處理了。
總結
Server應用需要Spark的技術,因此,是在Spark環境中運行。
Clinet應用,可以是一個普通的Java應用。
下面請看
至此,我們已經寫好了一個spark集群+akka+scala的應用。下一步請看:
Spark集群 + Akka + Kafka + Scala 開發(4) : 開發一個Kafka + Spark的應用