Akka(38): Http:Entityof ByteString-數據傳輸基礎


  我們說過Akka-http是一個好的系統集成工具,集成是通過數據交換方式實現的。Http是個在網上傳輸和接收的規范協議。所以,在使用Akka-http之前,可能我們還是需要把Http模式的網上數據交換細節了解清楚。數據交換雙方是通過Http消息類型Request和Response來實現的。在Akka-http中對應的是HttpRequest和HttpResponse。這兩個類型都具備HttpEntity類型來裝載需要交換的數據。首先,無論如何數據在線上的表現形式肯定是一串bytes。所以,數據交換兩頭Request,Response中的Entity也必須是以bytes來表達的。在Akka-http里我們把需要傳輸的數據轉換成ByteString,通過網絡發送給接收端、接收端再把收到消息Entity中的ByteString轉換成目標類型的數據。這兩個轉換過程就是Akka-http的Marshalling和Unmarshalling過程了。我們先從HttpEntity的構建函數來了解它的定義:

object HttpEntity { implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string) implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes) implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data) def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict =
    if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset))) def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict =
    if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes)) def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict =
    if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data) def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity =
    if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data) def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked = HttpEntity.Chunked.fromData(contentType, data) ...

很明顯,HttpEntity可以分兩大類,一種是Strict類型的,它的data是ByteString。另一種是UniversalEntity類型,它的數據dataBytes是Source[ByteString,Any]。無論如何最終在線上的還是ByteString。HttpEntity的ContentType注明了傳輸數據格式,有:

object ContentTypes { val `application/json` = ContentType(MediaTypes.`application/json`) val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8` val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8` val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8` val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8` // used for explicitly suppressing the rendering of Content-Type headers on requests and responses
  val NoContentType = ContentType(MediaTypes.NoMediaType) }

注意:ContentType只是一種備注,不影響線上數據表達形式,線上的數據永遠是ByteString。但是,其中的application/octet-stream類型代表數據必須是Source[ByteString,Any]。我們下面就通過客戶端的例子來理解HttpEntity。下面是一個客戶端測試函數:

  def runService(request: HttpRequest, rentity: RequestEntity) = { val futResp = for { entity <- Future.successful(rentity) resp <- Http(sys).singleRequest( request.copy(entity = rentity) ) } yield resp futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } }

我們只需要對這個函數傳入RequestEntity就可以了解返回Response里Entity的許多細節了。首先我們要求服務端發送一個純字符串Hello World。服務端代碼如下:

 } ~ path("text") {
      get {
        complete("Hello World!")
      } ~

雖然complete("Hello World!")有些迷糊,不過應該complete做了些字符串到ByteString的轉換。我們可以從上面這個runService函數得到證實。下面是這個例子的調用:

  val reqText = HttpRequest(uri = s"http://localhost:8011/text") runService(reqText,HttpEntity.Empty) .andThen{case _ => sys.terminate()}

從顯示的結果可以得出runService函數中的entity.dataBytes.map(_.utf8String)已經把ByteString轉換成了String,也就是說服務器端發送的Entity里的數據是ByteString。

我們再試着發送一些數據給服務端,然后讓服務端把結果通過response entity返回來:

    } ~ path("text") { get { complete("Hello World!") } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } }

我們看到服務端對request entity的操作是以ByteString進行的。客戶端上傳一串字符的request如下:

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") val uploadText = HttpEntity( ContentTypes.`text/plain(UTF-8)`, // transform each number to a chunk of bytes
    ByteString("hello world again") ) runService(postText,uploadText) .andThen{case _ => sys.terminate()}

我們可以看到放進entity里的數據是ByteString。

我們知道Akka-http是基於Akka-Stream的,具備Reactive-Stream功能特性。下面我們就示范一下如何進行stream的上傳下載。首先定制一個Source:

  val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

服務端也是用HttpEntity來裝載這個Source然后通過HttpRequest傳給客戶端的:

  path("random") { get { complete( HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes
            source.take(10000) ) ) } ~
  

我們在客戶端還是用runService來解析傳過來的entity。由於接收一個大型的Source,所以需要修改一下接收方式代碼:

 futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!") case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") }  

用下面的方式調用

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") runService(reqRandom,HttpEntity.Empty) .andThen{case _ => sys.terminate()}

再示范一下在客戶端用Source上傳數據。服務端代碼:

 post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } }

客戶端上傳數據范例:

 val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers) val bytes = HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes
    source.take(10000) ) val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") runService(postRandom,bytes) .andThen{case _ => sys.terminate()}

從上面討論我們了解了在Marshal,Unmarshal下層只是ByteString的操作和轉換。下面是本次討論示范源代碼:

服務端:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.util.ByteString import akka.http.scaladsl.model.HttpEntity._ import scala.util.Random object ServerEntity extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers) val route = path("random") { get { withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes
              source.take(1000)) ) } } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } } } ~ path("text") { get { complete("Hello World!") } ~ post { withoutSizeLimit { extractDataBytes { bytes => val data = bytes.runFold(ByteString())(_ ++ _) onComplete(data) { t => complete(t) } } } } } val (port, host) = (8011,"localhost") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }

客戶端:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource import akka.http.scaladsl.model._ import scala.concurrent.duration._ import akka.util.ByteString import scala.concurrent._ import scala.util._ object ClientEntity extends App { implicit val sys = ActorSystem("ClientSys") implicit val mat = ActorMaterializer() implicit val ec = sys.dispatcher def runService(request: HttpRequest, rentity: RequestEntity) = { val futResp = for { entity <- Future.successful(rentity) resp <- Http(sys).singleRequest( request.copy(entity = rentity) ) } yield resp futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!") case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download rows!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } val reqText = HttpRequest(uri = s"http://localhost:8011/text") // runService(reqText,HttpEntity.Empty) // .andThen{case _ => sys.terminate()}
 val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") val uploadText = HttpEntity( ContentTypes.`text/plain(UTF-8)`, // transform each number to a chunk of bytes
    ByteString("hello world again") ) // runService(postText,uploadText) // .andThen{case _ => sys.terminate()}
 val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") // runService(reqRandom,HttpEntity.Empty) // .andThen{case _ => sys.terminate()}
 val numbers = Source.fromIterator(() => Iterator.continually(Random.nextInt())) .map(n => ByteString(s"$n\n")) //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers) val bytes = HttpEntity( ContentTypes.`application/octet-stream`, // transform each number to a chunk of bytes
    source.take(10000) ) val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") runService(postRandom,bytes) .andThen{case _ => sys.terminate()} }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM