geotrellis使用(三十六)瓦片入庫更新圖層


前言

Geotrellis 是針對大數據量柵格數據進行分布式空間計算的框架,這一點毋庸置疑,並且無論采取何種操作,其實都是先將大塊的數據切割成一定大小的小數據(專業術語為瓦片),這是分治的思想,也是分布式計算的精髓,所以使用 Geotrellis 的第一步工作就是要將數據切片(無論是存儲在內存中還是進行持久化),然而即使其能力再“大”在實際工作中也難以處理以下幾種需求:

  1. 全球(大范圍)高分辨率遙感影像數據,數據量在 TB 級;
  2. 局部地區數據更新;
  3. 不同時間數據融合。

這幾種情況下我們都很難或者沒有辦法同時對這些數據進行處理,可行的方案就是執行更新操作或者分批處理。在 Geotrellis 框架中提供了數據的 ETL 接口,但是只能進行 write 操作,並不能進行 update 操作,write 操作會覆蓋掉此圖層中已有數據,並且相鄰數據之間無法進行拼接,導致接邊處數據缺失,所以分批處理只能寫到不同的圖層,這又給數據的調用計算等處理造成很大的麻煩。本文在原有 ETL 的基礎上簡單介紹如何實現同層瓦片的 update 操作。

一、原生 ETL

1.1 ETL 工作流程介紹

ETL 完成的工作是將數據切割成瓦片並進行持久化,在 Geotrellis 中你可以將數據直接放在內存中(雖然也未提供現成的解決方案,我前面的文章簡單介紹了如何實現),也可以將數據放在 Accumulo、HBASE 等分布式數據庫或者是 HDFS 和 普通文件系統中。實現代碼在 geotrellis.spark.etl 包下的 Etl 類中,調用 ingest 方法的時候傳入不同的參數即可實現數據入庫的操作,此部分前面也已經介紹過,這里不再贅述。ingest 方法主要代碼如下:

val etl = Etl(conf, modules)
val sourceTiles = etl.load[I, V]
val (zoom, tiled) = etl.tile(sourceTiles)
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)

整個流程為首先使用 load 函數讀取原始數據,再調用 tile 函數對數據進行切割,而后調用 save 函數將切割后的瓦片進行持久化。所以只要在 save 方法中判斷要存放數據的圖層是否存在,如果不存在執行已有操作,如果存在則執行 update 操作。

1.2 save 方法介紹

原生 save 方法如下:

def save[
K: SpatialComponent: TypeTag,
V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]]
): Unit = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]

val outputPlugin =
  combinedModule
    .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]]
    .find { _.suitableFor(output.backend.`type`.name) }
    .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'"))

def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = {
  val currentId = id.copy(zoom = zoom)
  outputPlugin(currentId, rdd, conf, saveAction)

  scheme match {
    case Left(s) =>
      if (output.pyramid && zoom >= 1) {
        val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions)
        savePyramid(nextLevel, nextRdd)
      }
    case Right(_) =>
      if (output.pyramid)
        logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step")
  }
}

savePyramid(id.zoom, rdd)
logger.info("Done")
}

主要邏輯在 savePyramid 函數中(scala 支持內部函數),其中 outputPlugin(currentId, rdd, conf, saveAction) 是將瓦片持久化的關鍵操作,val outputPlugin = ... 是取到持久化的種類,這里無需過多考慮,只要考慮成是 Accumulo 或者其他種類即可,所以 outputPlugin(currentId, rdd, conf, saveAction) 調用了 OutputPlugin 類型的 apply 方法,如下:

def apply(
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
conf: EtlConf,
saveAction: SaveAction[K, V, M] = SaveAction.DEFAULT[K, V, M]
): Unit = {
    implicit val sc = rdd.sparkContext
    saveAction(attributes(conf), writer(conf), id, rdd)
}

saveAction 默認取了 SaveAction.DEFAULT[K, V, M],這是定義在 ETL 類中的一個方法,是的,此處傳入了一個方法, saveAction(attributes(conf), writer(conf), id, rdd) 實際執行了下述方法:

def DEFAULT[K, V, M] = {
  (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) =>
    writer.write(layerId, rdd)
}

可以看到最后調用的是 writer.write(layerId, rdd),此處 writer 根據持久化對象不同而不同,在 Accumulo 中為 AccumuloLayerWriter。

到此我們便清楚了 save 方法的工作流程以及整個 ETL 操作的工作流程,下面開始對其進行改造。

二、改造 ETL

本文僅針對瓦片數據持久化放到 Accumulo 數據庫中進行介紹,並未如原代碼一樣對所有情況進行自動適配,其他持久化方式只需判斷和修改對應的 LayerWriter 實例即可。

2.1 改造 save 方法

首先判斷持久化對象中是否已存在此圖層,代碼如下:

val currentId: LayerId = ...
val instance = conf.outputProfile.get.asInstanceOf[AccumuloProfile].getInstance.get
val attributeStore = AccumuloAttributeStore(instance)
val exist = attributeStore.layerExists(currentId)

首先取到持久化的實例,本文直接指定為 Accumulo 類型,而后獲取 AccumuloAttributeStore 對象,此對象相當於是元數據,其中存儲圖層的范圍層級等信息,最后通過 layerExists 方法即可得到圖層是否存在。

如果圖層不存在則直接調用原生的 outputPlugin(currentId, rdd, conf) 即可,如果圖層已經存在則執行下述操作:

AccumuloLayerWriter(instance = instance, conf.output.backend.path.toString, AccumuloLayerWriterOptions(SocketWriteStrategy()))
          .update(currentId, rdd, (v1: V, v2: V) => v1.merge(v2))

此處需要特別指出的是 AccumuloLayerWriterOptions(SocketWriteStrategy()),此句指明了 Accumulo 的操作策略,按照官方說法,使用 SocketWriteStrategy 會導致操作變慢,切不能針對大量數據的導入操作,使用 HdfsWriteStrategy 支持 Accumulo 大批量導入操作(個人猜測是 Accumulo 數據存放在 HDFS 中,首先把數據寫入 HDFS 然后再並行持久化到 Accumulo,所以可以進行大量數據操作)。雖然看上去 HdfsWriteStrategy 非常完美,但是問題在於使用此策略無法執行 update 操作,會報錯。魚和熊掌不能兼得,需要根據實際情況進行選擇和設計。

這樣就可實現圖層中瓦片的更新操作。

2.2 Key Index

當然寫到這並沒有完成工作,如果僅在 save 函數中完成上述改造,再真正的 update 的時候會報錯,提示 key index 超出定義的范圍,需要重新定義。還記得上面說的 attributeStore 吧,通過此方法可以取到元數據信息,此處的 key index 也寫在元數據中,key index 說白了就是瓦片編號的范圍,我們都知道瓦片是根據編號進行請求的,那么一塊數據就會有一個編號范圍,所以圖層不存在的時候執行的是 write 方法,寫入的是當時數據瓦片編號范圍,但是真正執行 update 的時候一般肯定是跟第一次數據范圍不同的,於是提示你需要更新編號的范圍。這個問題很容易解決,我們只需要在第一次寫入的時候將數據范圍設置成全球即可。

在 tile 方法的 resizingTileRDD 方法定義如下:

def resizingTileRDD(
  rdd: RDD[(I, V)],
  floatMD: TileLayerMetadata[K],
  targetLayout: LayoutDefinition
): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = {
  // rekey metadata to targetLayout
  val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
  val tiledMD = floatMD.copy(
    bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds),
    layout = targetLayout
  )

  // > 1 means we're upsampling during tiling process
  val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution
  val tilerOptions = Tiler.Options(
    resampleMethod = method,
    partitioner = new HashPartitioner(
      partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt))

  val tiledRDD = rdd.tileToLayout[K](tiledMD, tilerOptions)
  ContextRDD(tiledRDD, tiledMD)
}

val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent)) 是獲取到當前數據在此 zoom 下的瓦片編號范圍,那么我們只需要將此處改成整個范圍即可,如下:

val newSpatialBounds = KeyBounds(
    SpatialKey(0, 0),
    SpatialKey(
      col = targetLayout.layoutCols,
      row = targetLayout.layoutRows
    ))

這樣即可實現正常的 update 操作。

三、總結

閱讀此文需要對 Geotrellis 框架有整體了解並熟悉其基本使用,可以參考本系列博客,使用 geotrellis 也需要對 scala 有所掌握,scala 語法在我接觸過的所有語言中應當是比較靈活的,靈活就導致麻煩。。。。

本文簡單介紹了如何實現 ETL 過程的 update 操作。這是我失業后寫的第一篇博客,失業后整個人對所有事情的理解更上了一步,無論是對技術還是生活都有更多的感悟,生活和技術都需要慢慢品味。

Geotrellis系列文章鏈接地址http://www.cnblogs.com/shoufengwei/p/5619419.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM