Hey Charlie,
Awesome to see this usage, and the code looks reasonable.
Here are some thoughts:
For EMR, you might not be getting the number of executors you need based on the memory settings. YARN can be finicky about scheduling or not scheduling executors based on the resources requested. If you're using m3.xlarges, I'd suggest the following settings:
--conf,spark.dynamicAllocation.enabled=true,\
--executor-memory 4800M \
--executor-cores 2 \
--conf,spark.yarn.executor.memoryOverhead=900M
These are the settings I've been using with m3.2xlarges, but since it's dynamic allocation it should be the same.
Check the Spark UI on the executors page to see how many executors are spun up, and how many total cores you are using. If the math doesn't work out with how many nodes/cores you have, then you are underutilizing the cluster.
To set the number of partitions on the S3 read, use
S3GeoTiffRDD.spatial("gfw2-data", "alerts-tsv/temp/rasters", S3GeoTiffRDD.Options(numPartitions = Some(1000))
don't be afraid to set the number of partitions high. In the Spark UI you can see the average task execution time for a stage. If this isn't less than 500ms, you can increase the partition count. You'll start hurting the driver if it has to keep track of too many partitions, but that shouldn't happen if you're under 100K partitions.
I would probably increase this to 1024
TileLayerMetadata.fromRdd(inputRdd, FloatingLayoutScheme(512))
You can repartition as part of the tileToLayout call
val tiledRDD: RDD[(SpatialKey, Tile)] =
inputRdd
.tileToLayout(rasterMetaData.cellType, rasterMetaData.layout, Tiler.Options(resampleMethod=Bilinear, partitioner=Some(HashPartitioner(1000))))
Instead of mapping over the features, you can map over the RDD and manually do the summary over the tile for each poly. This is a bit more complicated, but uses a spark aggregation.
It uses a Map of String -> Polygon instead of a sequence of Polygons, so we can return some sum value per Polygon ID
val polygons: Map[String, Polygon] = ???
// Sequence op - combine one value with our aggregate value
val seqOp: (Map[String, Double], Raster[Tile]) => Map[String, Double] =
{ (acc, raster) =>
val extent = raster.extent
val env = extent.toPolygon
polygons
.filter { tup => env.intersects(tup._2) }
.map { case (name, polygon) =>
var v = 0.0
if (polygon.contains(env)) {
// polygon contains this whole tile, so
// sum up the value of tile
raster.tile.foreachDouble { z =>
if(isData(z)) { v += z * cellArea }
}
} else {
// Polygon only partially intersects the tile,
// use fractional rasterization.
val tile = raster.tile
val intersections =
polygon.intersection(env) match {
case PolygonResult(intersectionPoly) => Seq(intersectionPoly)
case MultiPolygonResult(mp) => mp.polygons.toSeq
case _ => Seq()
}
intersections
.map { p =>
FractionalRasterizer.foreachCellByPolygon(p, raster.rasterExtent)(
new FractionCallback {
def callback(col: Int, row: Int, fraction: Double): Unit = {
v += tile.getDouble(col, row) * cellArea * fraction
}
}
)
}
}
(name, v)
}
.toMap
}
// Combine op - combine two aggregate sums
val combOp: (Map[String, Double], Map[String, Double]) => Map[String, Double] =
{ (acc1, acc2) =>
(acc1.keySet ++ acc2.keySet)
.map { k => k -> (acc1.get(k).toList ::: acc2.get(k).toList) }
.map { case (k, vs) => (k, vs.sum) }
.toMap
}
val volume =
layerRdd
.asRasters
.map(_._2)
.aggregate(Map[String, Double]())(seqOp, combOp)
I didn't compile the above code, so there might be some issues you'd have to track down, but that describes the approach.
In fact, if all you really want are polygonal sums, you can just do that from the original tiles off of S3. A similar approach can be taken to the one above.
Hopefully this helps some, let me know how it goes! Feel free to ask for clarifications and follow ups.
Best,
Rob