用Akka構建一個簡易的分布式文件系統


本來初期打算用Hadoop 2,可是后來有限的服務器部署了Solr Cloud,各種站點,發現資源不夠了,近10T的文件,已經幾乎把服務器的磁盤全部用光。想來想去,由於目前架構基於Scala的,所以還是用Scala Akka實現了一個簡單版本的分布式文件系統。

Scala版本是2.10.3:http://www.scala-lang.org,Akka版本是2.2.3:http://akka.io。

所有文件隨機放在不同的服務器上,在數據庫中記錄了文件存放的服務器IP地址、文件路徑。在服務端部署基於Akka的簡單文件服務,接收文件路徑,讀取並返回文件內容。調用者根據文件地址,去數據庫中查找文件的服務IP地址和文件路徑,根據得到的服務器IP地址,傳入文件路徑,調用該服務器的文件服務。

以下是部分實現代碼。

1.文件服務參數

1 case class PatentFulltextArgs(
2   val url: String,
3   val start: Int,
4   val size: Int) {
5 
6 }

2.文件服務Trait(有點像WCF中的服務契約)

1 trait PatentFulltextService {
2   def find(args: PatentFulltextArgs): Array[Byte]
3 }

3.文件服務實現

 1 class PatentFulltextServiceImpl extends PatentFulltextService with Disposable {
 2   def find(args: PatentFulltextArgs): Array[Byte] = {
 3     val list = ListBuffer[Byte]()
 4     val file = FileSystems.getDefault().getPath(args.url)
 5 
 6     using(Files.newInputStream(file)) { in =>
 7       {
 8         val bytes = new Array[Byte](args.size + 1)
 9         in.skip(args.start)
10         in.read(bytes, 0, bytes.length)
11 
12         list ++= bytes
13       }
14     }
15 
16     list.toArray
17   }
18 }

4.用戶Akka Deploy發布的類

class ServiceApplication extends Bootable {
  val system = ActorSystem("serivce", ConfigFactory.load.getConfig("service"))
  def startup() {
    TypedActor(system).typedActorOf(TypedProps[PatentFulltextServiceImpl], "patentfulltext")
  }

  def shutdown() {
    system.shutdown
  }
}

在這里,我使用的Akka的TypeActor,請參考:http://doc.akka.io/docs/akka/2.2.3/scala/typed-actors.html。

以下是部署過程。

把生成的jar包,發布在Akka的deploy目錄下,根據需要修改Akka的配置文件目錄config下的application.conf。以下是我配置的內容,僅供參考:

actor {

provider = "akka.remote.RemoteActorRefProvider"

 

typed {

# Default timeout for typed actor methods with non-void return type

timeout = 6000s

}

}

remote {

transport = "akka.remote.netty.NettyRemoteTransport"

netty.tcp {

  hostname = "服務端IP"

  port = 2552

}

客戶端使用時只需要服務契約Trait和相關實體類,以下是我寫的一個客戶端調用的類,僅供參考:

 1 object RemoteService {
 2   val logger = LoggerFactory.getLogger(this.getClass())
 3   private var system: ActorSystem = null
 4 
 5   def apply(configFile: String) = {
 6     system = ActorSystem("RemoteService", ConfigFactory.parseFile(new File(configFile)))
 7   }
 8 
 9   def findPatentFulltext(serverIp: String, patentFulltextArgs: PatentFulltextArgs) = {
10     TypedActor(system).typedActorOf(TypedProps[com.cloud.akka.service.model.PatentFulltextService], system.actorFor("akka.tcp://serivce@" + serverIp + ":2552/user/patentfulltext")).find(patentFulltextArgs)
11 
12   }
13 
14   def shutdown = {
15     if (null != system) system.shutdown()
16   }
17 }}

以下問題是我還沒找到合適的解決辦法:

1.Akka無法傳輸大文件,即使修改配置,服務器可以返回,但是接收的客戶端還會報錯。我的解決方案是在客戶端分塊讀取,然后合並。

2.在客戶端使用時,TypedActor沒有找到使用ActorSelection構建,因為ActorFor是標記為Deprecated。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM