前言
之前 GeoTrellis 為方便用戶將數據(GeoTiff 等遙感影像)導入到 backend (包含 Accumulo、File、Hadoop 等格式)中,編寫了一個 ETL 類,該類的輸入為用戶配置好的 json 文件,其中包含數據源、數據類型、投影、瓦片類型、處理方式等等處理過程中需要用到的信息。
從 2.0 版開始,GeoTrellis 加入了流水線(pipeline)功能,用戶可以使用 json 或者 AST 將數據的處理過程配置成處理的流水線過程,這樣只需要執行此流水線,系統便會自動的將輸入數據處理成最終結果。
本文簡單分析 GeoTrellis 中的流水線實現方式,並探討此技術在其他方面的應用。
一、原理分析
1.1 前后兩種方式對比
其實在功能和性能上並沒有任何的改進,只是將原來的 ETL 類代碼變成了流水線中的一個個節點,這些節點的信息仍是原來 json 配置文件中的信息。那么做此改進到底有什么用呢?我們先來看一下前后兩種方式同一種數據處理方式的代碼。
- ETL 方式:
object Etl {
val defaultModules = Array(s3.S3Module, hadoop.HadoopModule, file.FileModule, accumulo.AccumuloModule, cassandra.CassandraModule, hbase.HBaseModule)
type SaveAction[K, V, M] = (AttributeStore, Writer[LayerId, RDD[(K, V)] with Metadata[M]], LayerId, RDD[(K, V)] with Metadata[M]) => Unit
object SaveAction {
def DEFAULT[K, V, M] = {
(_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) =>
writer.write(layerId, rdd)
}
}
def ingest[
I: Component[?, ProjectedExtent]: TypeTag: ? => TilerKeyMethods[I, K],
K: SpatialComponent: Boundable: TypeTag,
V <: CellGrid: TypeTag: RasterRegionReproject: Stitcher: (? => TileReprojectMethods[V]): (? => CropMethods[V]): (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V])
](
args: Seq[String], modules: Seq[TypedModule] = Etl.defaultModules
)(implicit sc: SparkContext) = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]
EtlConf(args) foreach { conf =>
/* parse command line arguments */
val etl = Etl(conf, modules)
/* load source tiles using input module specified */
val sourceTiles = etl.load[I, V]
/* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
val (zoom, tiled) = etl.tile(sourceTiles)
/* save and optionally pyramid the mosaiced layer */
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
}
}
}
case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.defaultModules) extends LazyLogging {
import Etl.SaveAction
val input = conf.input
val output = conf.output
def scheme: Either[LayoutScheme, LayoutDefinition] = {
if (output.layoutScheme.nonEmpty) {
val scheme = output.getLayoutScheme
logger.info(scheme.toString)
Left(scheme)
} else if (output.layoutExtent.nonEmpty) {
val layout = output.getLayoutDefinition
logger.info(layout.toString)
Right(layout)
} else
sys.error("Either layoutScheme or layoutExtent with cellSize/tileLayout must be provided")
}
@transient val combinedModule = modules reduce (_ union _)
/**
* Loads RDD of rasters using the input module specified in the arguments.
* This RDD will contain rasters as they are stored, possibly overlapping and not conforming to any tile layout.
*
* @tparam I Input key type
* @tparam V Input raster value type
*/
def load[I: Component[?, ProjectedExtent]: TypeTag, V <: CellGrid: TypeTag]()(implicit sc: SparkContext): RDD[(I, V)] = {
val plugin = {
val plugins = combinedModule.findSubclassOf[InputPlugin[I, V]]
if(plugins.isEmpty) sys.error(s"Unable to find input module for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'")
plugins.find(_.suitableFor(input.backend.`type`.name, input.format)).getOrElse(sys.error(s"Unable to find input module of type '${input.backend.`type`.name}' for format `${input.format} " +
s"for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'"))
}
// clip in dest crs
input.clip.fold(plugin(conf))(extent => plugin(conf).filter { case (k, _) =>
val pe = k.getComponent[ProjectedExtent]
output.getCrs.fold(extent.contains(pe.extent))(crs => extent.contains(pe.extent.reproject(pe.crs, crs)))
})
}
/**
* Tiles RDD of arbitrary rasters to conform to a layout scheme or definition provided in the arguments.
* First metadata will be collected over input rasters to determine the overall extent, common crs, and resolution.
* This information will be used to select a LayoutDefinition if LayoutScheme is provided in the arguments.
*
* The tiling step will use this LayoutDefinition to cut input rasters into chunks that conform to the layout.
* If multiple rasters contribute to single target tile their values will be merged cell by cell.
*
* The timing of the reproject steps depends on the method chosen.
* BufferedReproject must be performed after the tiling step because it leans on SpatialComponent to identify neighboring
* tiles and sample their edge pixels. This method is the default and produces the best results.
*
* PerTileReproject method will be performed before the tiling step, on source tiles. When using this method the
* reproject logic does not have access to pixels past the tile boundary and will see them as NODATA.
* However, this approach does not require all source tiles to share a projection.
*
* @param rdd RDD of source rasters
* @param method Resampling method to be used when merging raster chunks in tiling step
*/
def tile[
I: Component[?, ProjectedExtent]: (? => TilerKeyMethods[I, K]),
V <: CellGrid: RasterRegionReproject: Stitcher: ClassTag: (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V]):
(? => TileReprojectMethods[V]): (? => CropMethods[V]),
K: SpatialComponent: Boundable: ClassTag
](rdd: RDD[(I, V)], method: ResampleMethod = output.resampleMethod): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) = {
val targetCellType = output.cellType
val destCrs = output.getCrs.get
/** Tile layers form some resolution and adjust partition count based on resolution difference */
def resizingTileRDD(
rdd: RDD[(I, V)],
floatMD: TileLayerMetadata[K],
targetLayout: LayoutDefinition
): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = {
// rekey metadata to targetLayout
val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
val tiledMD = floatMD.copy(
bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds),
layout = targetLayout
)
// > 1 means we're upsampling during tiling process
val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution
val tilerOptions = Tiler.Options(
resampleMethod = method,
partitioner = new HashPartitioner(
partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt))
rdd.tileToLayout[K](tiledMD, tilerOptions)
}
output.reprojectMethod match {
case PerTileReproject =>
def reprojected(targetCellSize: Option[CellSize] = None) = {
val reprojectedRdd = rdd.reproject(destCrs, RasterReprojectOptions(method = method, targetCellSize = targetCellSize))
val floatMD = { // collecting floating metadata allows detecting upsampling
val (_, md) = reprojectedRdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
}
reprojectedRdd -> floatMD
}
scheme match {
case Left(scheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = scheme.levelForZoom(output.maxZoom.get)
val (reprojectedRdd, floatMD) = reprojected(Some(layoutDefinition.cellSize))
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
case Left(scheme) => // True for both FloatinglayoutScheme and ZoomedlayoutScheme
val (reprojectedRdd, floatMD) = reprojected()
val LayoutLevel(zoom, layoutDefinition) = scheme.levelFor(floatMD.extent, floatMD.cellSize)
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
case Right(layoutDefinition) =>
val (reprojectedRdd, floatMD) = reprojected()
0 -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
}
case BufferedReproject =>
// Buffered reproject requires that tiles are already in some layout so we can find the neighbors
val md = { // collecting floating metadata allows detecting upsampling
val (_, md) = rdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
}
val tiled = ContextRDD(rdd.tileToLayout[K](md, method), md)
scheme match {
case Left(layoutScheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(output.maxZoom.get)
(zoom, output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
case _ => tiled.reproject(destCrs, layoutDefinition, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
})
case Left(layoutScheme) =>
output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutScheme, bs, method)
case _ => tiled.reproject(destCrs, layoutScheme, method)
}
case Right(layoutDefinition) =>
output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, method)
case _ => tiled.reproject(destCrs, layoutDefinition, method)
}
}
}
}
/**
* Saves provided RDD to an output module specified by the ETL arguments.
* This step may perform two to one pyramiding until zoom level 1 is reached.
*
* @param id Layout ID to b
* @param rdd Tiled raster RDD with TileLayerMetadata
* @param saveAction Function to be called for saving. Defaults to writing the layer.
* This gives the caller an oppurtunity to modify the layer before writing,
* or to save additional attributes in the attributes store.
*
* @tparam K Key type with SpatialComponent corresponding LayoutDefinition
* @tparam V Tile raster with cells from single tile in LayoutDefinition
*/
def save[
K: SpatialComponent: TypeTag,
V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]]
): Unit = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]
val outputPlugin =
combinedModule
.findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]]
.find { _.suitableFor(output.backend.`type`.name) }
.getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'"))
def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = {
val currentId = id.copy(zoom = zoom)
outputPlugin(currentId, rdd, conf, saveAction)
scheme match {
case Left(s) =>
if (output.pyramid && zoom >= 1) {
val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions)
savePyramid(nextLevel, nextRdd)
}
case Right(_) =>
if (output.pyramid)
logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step")
}
}
savePyramid(id.zoom, rdd)
logger.info("Done")
}
}
- 流水線方式:
implicit val sc: SparkContext = ???
val scheme = Left[LayoutScheme, LayoutDefinition](FloatingLayoutScheme(512))
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
// typed way, as in the JSON example above
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
list
.node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
// in some cases you may want just to evaluate the pipeline
// to add some flexibility we can do parsing and avaluation steps manually
// erasedNode function would parse JSON into an ErasedNode type that can be evaluated
val untypedAst: ErasedNode = list.erasedNode
// it would be an untyped result, just some evaluation
// but you still have a chance to catch and handle some types of exceptions
val untypedResult: Any =
Try {
untypedAst.unsafeEval
} match {
case Success(_) =>
case Failure(e) =>
}
// typed result
val typedResult: Option[Stream[(Int, TileLayerRDD[SpatialKey])]] =
Try {
untypedAst.eval
} match {
case Success(stream) => Some(stream)
case Failure(e) => None
}
從代碼量我們就能看出來新的流水線方式明顯減少了很多,其實正如前面所說,流水線就是將之前的操作封裝成了一個個的操作節點,每種節點的代碼已經寫好,用戶只需要將自己需要操作的節點串聯起來,最終執行整個流水線即可。
1.2 實現原理
認真的或者是熟悉 GeoTreliis 數據 ETL 的用戶都知道,其實 ETL 無非是以單波段、多波段兩種波段形式的柵格數據以及無時間數據和時間序列數據的兩種時間格式的組合類型為輸入及數據的存儲位置(S3、Hadoop、File等),取出此數據並做投影轉換、合並、生成金字塔等變換,最終將數據寫入指定的 backend。
所以其 Pipeline 實現就是定義了對應的 ReadType、TreansformType、WriteType。我們看上面的例子
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
指定了 Read 部分,包含存放路徑、存放位置(S3)、數據類型(Singleband)、時間格式(Spatial 無時間標記)。
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
TileToLayout 將數據變成具有數據類型、空間布局等信息的 RDD,方便后續的瓦片切割等操作。
Reproject 對數據做投影變換。
Pyramid 將數據切割成金字塔。
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
JsonWrite 指定數據的輸出方式,包含索引方式、輸出類型,並且系統自動根據給定的 uri 判斷輸出存儲位置。
到此,已經指定好了上述的三種節點。
val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
此句將上述的操作節點串聯起來生成了一個 List。
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
list
.node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
上述兩句生成對應的節點序列,最終執行 eval
函數,執行整個流水線得到最終結果。
就是這么簡單的幾句,完成了整個數據的處理流程,需要注意的是在串聯最終流水線的時候,前一個數據的輸出一定是后一個數據的輸入類型,否則流水線便無法繼續執行。
整個原理很類似最近很火的 TensorFlow、Keras 等神經網絡框架,首先定義一個神經網絡節點處理模型,其實就是數據處理模型,二者是一致的,只不過神經網絡更關注數據的狀態,比如維度、尺寸(節點數量)等等,而 GeoTrellis 關注的是數據處理 的方式。
關於 GeoTrellis 的流水線實現原理就介紹到這里,感興趣的朋友可以查閱源碼進行進一步分析。
二、啟發
認真學習了 GeoTrellis 的 Pipeline 技術 后,我發現很多東西都可以用這種方式來實現,比如剛剛講到的神經網絡。再比如我們可以將遙感數據的其他處理也封裝成流水線,如不同的模型計算、勻光勻色、正射糾正等等。
凡是這種涉及到前后因果關聯或是需要一步步進行操作的過程都可以封裝成流水線,使得在后續處理的時候更加的規范化並且代碼更精簡,更方便使用。這也正是福特汽車為整個汽車工業帶來的革命性巨變。
最近讀計算機原理的相關書籍,也着重介紹了 CPU 指令工作的流水線技術,這些技術也可以用到數據處理中來,將數據處理流程按照指令來運行,這樣比如對於涉及到大量內存操作或涉及到大量 CPU 操作的就可以錯開,可以保持服務器的全負荷運行,必然能夠加快處理速度。
三、總結
本文介紹了 GeoTrellis 中的 Pipeline 實現原理,並簡單分析了此技術對於我們處理其他技術的一些啟發。
Geotrellis系列文章鏈接地址http://www.cnblogs.com/shoufengwei/p/5619419.html