Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geotrellis-user] simple zonal stats example

Hey Charlie,

Awesome to see this usage, and the code looks reasonable.

Here are some thoughts:

For EMR, you might not be getting the number of executors you need based on the memory settings. YARN can be finicky about scheduling or not scheduling executors based on the resources requested. If you're using m3.xlarges, I'd suggest the following settings:


--conf,spark.dynamicAllocation.enabled=true,\
--executor-memory 4800M \
--executor-cores 2 \
--conf,spark.yarn.executor.memoryOverhead=900M

These are the settings I've been using with m3.2xlarges, but since it's dynamic allocation it should be the same.

Check the Spark UI on the executors page to see how many executors are spun up, and how many total cores you are using. If the math doesn't work out with how many nodes/cores you have, then you are underutilizing the cluster.

To set the number of partitions on the S3 read, use

S3GeoTiffRDD.spatial("gfw2-data", "alerts-tsv/temp/rasters", S3GeoTiffRDD.Options(numPartitions = Some(1000))

don't be afraid to set the number of partitions high. In the Spark UI you can see the average task execution time for a stage. If this isn't less than 500ms, you can increase the partition count. You'll start hurting the driver if it has to keep track of too many partitions, but that shouldn't happen if you're under 100K partitions.

I would probably increase this to 1024
TileLayerMetadata.fromRdd(inputRdd, FloatingLayoutScheme(512))

You can repartition as part of the tileToLayout call

val tiledRDD: RDD[(SpatialKey, Tile)] =
      inputRdd
        .tileToLayout(rasterMetaData.cellType, rasterMetaData.layout, Tiler.Options(resampleMethod=Bilinear, partitioner=Some(HashPartitioner(1000))))

Instead of mapping over the features, you can map over the RDD and manually do the summary over the tile for each poly. This is a bit more complicated, but uses a spark aggregation.
It uses a Map of String -> Polygon instead of a sequence of Polygons, so we can return some sum value per Polygon ID

      val polygons:  Map[String, Polygon] = ???

      // Sequence op - combine one value with our aggregate value
      val seqOp: (Map[String, Double], Raster[Tile]) => Map[String, Double] =
        { (acc, raster) =>
          val extent = raster.extent
          val env = extent.toPolygon

          polygons
            .filter { tup => env.intersects(tup._2) }
            .map { case (name, polygon) =>
              var v = 0.0

              if (polygon.contains(env)) {
                // polygon contains this whole tile, so
                // sum up the value of tile
                raster.tile.foreachDouble { z =>
                  if(isData(z)) { v += z * cellArea }
                }
              } else {
                // Polygon only partially intersects the tile,
                // use fractional rasterization.
                val tile = raster.tile
                val intersections =
                  polygon.intersection(env) match {
                    case PolygonResult(intersectionPoly) => Seq(intersectionPoly)
                    case MultiPolygonResult(mp) => mp.polygons.toSeq
                    case _ => Seq()
                  }

                intersections
                  .map { p =>
                    FractionalRasterizer.foreachCellByPolygon(p, raster.rasterExtent)(
                      new FractionCallback {
                        def callback(col: Int, row: Int, fraction: Double): Unit = {
                          v += tile.getDouble(col, row) * cellArea * fraction
                        }
                      }
                    )
                  }
              }

              (name, v)
            }
            .toMap
        }

      // Combine op - combine two aggregate sums
      val combOp: (Map[String, Double], Map[String, Double]) => Map[String, Double] =
        { (acc1, acc2) =>
          (acc1.keySet ++ acc2.keySet)
            .map { k => k -> (acc1.get(k).toList ::: acc2.get(k).toList) }
            .map { case (k, vs) => (k, vs.sum) }
            .toMap
        }

      val volume =
        layerRdd
          .asRasters
          .map(_._2)
          .aggregate(Map[String, Double]())(seqOp, combOp)


I didn't compile the above code, so there might be some issues you'd have to track down, but that describes the approach.

In fact, if all you really want are polygonal sums, you can just do that from the original tiles off of S3. A similar approach can be taken to the one above.

To get the properties, you have to deserialize the JSON as Features instead of Geometries, and there has to be an implicit JsonReader[T] for the property type you are reading. For an example, see this unit test: https://github.com/locationtech/geotrellis/blob/master/vector-test/src/test/scala/spec/geotrellis/vector/io/json/FeatureFormatsSpec.scala#L168

Hopefully this helps some, let me know how it goes! Feel free to ask for clarifications and follow ups.

Best,
Rob

On Fri, Aug 11, 2017 at 2:33 PM, Charlie Hofmann <charlie.hofmann@xxxxxxx> wrote:

Hi list,

 

I’m trying to run polygonalSum for a variety of polygons on a 10x10 degree float raster. I’ve forked the geotrellis-landsat-tutorial and put together some code here:

https://github.com/wri/geotrellis-zonal-stats. I’m very new to Scala and even newer to GeoTrellis, so any help on code/style/convention is appreciated.

 

My code in ZonalStats.scala does the following:

 

  1. Reads a bunch of 256 x 256 raster tiles from s3 using S3GeoTiffRDD
  2. Converts this to a tiledRDD and then to a layerRDD
  3. Reads a geojson file to get the geometries from it
  4. Maps over the geometries (115 in total), calculating polygonalSumDouble for each one

 

I’m running this on EMR using a yarn-managed cluster of m3.xlarge machines— this takes about half an hour. To package the code, I execute:

./sbt "project geotrellis-zonal-stats" assembly

 

And then to run it:

spark-submit --class tutorial.ZonalStats target/scala-2.11/demo-assembly-0.2.0.jar --master yarn --executor-memory 15g

 

Most of the GeoTrellis examples deal with making web services for tiled maps or on-the-fly geoprocessing. The workflow outlined above is for one-off analysis. While it works (the polygonalSum values are correct), I’d like to speed it up if possible. In particular, I’m wondering:

 

  1. Would following the ETL process to ingest and write GeoTrellis layers to S3 speed things up?
  2. Are there any shortcuts I can take regarding GeoTiffs > tiledRDD > layerRDD?
  3. Is it possible to get geoJSON properties (not just geometry) when mapping over a JsonFeatureCollection?
  4. What colossal n00b mistakes am I making?

 

My ultimate goal is to store a global coverage of 0.00025 degree TIFs on s3, then tabulate polygonalSums for all GADM admin level 2 boundaries.

 

Thanks to all you folks for your help, and for developing such a cool tool set! Looking forward to building this into our regular workflow!

 

Charlie

 


_______________________________________________
geotrellis-user mailing list
geotrellis-user@locationtech.org
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geotrellis-user



--
Robert Emanuele, VP of Research
Azavea |  990 Spring Garden Street, 5th Floor, Philadelphia, PA
remanuele@xxxxxxxxxx  |  @lossyrob

Back to the top