The purpose of this issue is to record some backtraces we identified during a long-running Spark job that point to potential areas of optimization. This is under the assumption that backtraces that are captured repeatedly by manual sampling are likely to be areas where a job is spending lots of compute time. The primary observation is that serialization to/from WKB is consuming a considerable fraction of operations where complex spatial relations are involved. Long term we should consider replacing "blob-based" serialization of WKT with something using Spark native data types (e.g. storing points as structs of floating point values). Backtrace 1 This job involved a complex spatial query where predicate pushdown was not available in RasterFrame's GeoTrellisRelation, and therefore spatial relations were being evaluated on every row.
com.vividsolutions.jts.io.WKBReader.readCoordinate(WKBReader.java:364)
com.vividsolutions.jts.io.WKBReader.readCoordinateSequence(WKBReader.java:333)
com.vividsolutions.jts.io.WKBReader.readCoordinateSequenceRing(WKBReader.java:351)
com.vividsolutions.jts.io.WKBReader.readLinearRing(WKBReader.java:258)
com.vividsolutions.jts.io.WKBReader.readPolygon(WKBReader.java:269)
com.vividsolutions.jts.io.WKBReader.readGeometry(WKBReader.java:208)
com.vividsolutions.jts.io.WKBReader.readMultiPolygon(WKBReader.java:308)
com.vividsolutions.jts.io.WKBReader.readGeometry(WKBReader.java:217)
com.vividsolutions.jts.io.WKBReader.read(WKBReader.java:152)
com.vividsolutions.jts.io.WKBReader.read(WKBReader.java:133)
org.locationtech.geomesa.spark.jts.util.WKBUtils$class.read(WKUtils.scala:35)
org.locationtech.geomesa.spark.jts.util.WKBUtils$.read(WKUtils.scala:40)
org.apache.spark.sql.jts.AbstractGeometryUDT.deserialize(AbstractGeometryUDT.scala:39)
astraea.spark.rasterframes.expressions.GeomDeserializerSupport$class.extractGeometry(GeomDeserializerSupport.scala:19)
astraea.spark.rasterframes.expressions.SpatialExpression.extractGeometry(SpatialExpression.scala:42)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:54)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:42)
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(_expression_.scala:423)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:219)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:218)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
Backtrace 2 This looks like the actual spatial relation evaluation, where we'd want most of the time to be spent. To speed this up we could start to transition to a Magellan-like approach where certain relations are implemented in CodeGen emitted, inlined Java code.
com.vividsolutions.jts.geom.util.ShortCircuitedGeometryVisitor.applyTo(ShortCircuitedGeometryVisitor.java:52)
com.vividsolutions.jts.operation.predicate.RectangleIntersects.intersects(RectangleIntersects.java:114)
com.vividsolutions.jts.operation.predicate.RectangleIntersects.intersects(RectangleIntersects.java:70)
com.vividsolutions.jts.geom.Geometry.intersects(Geometry.java:775)
org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions$$anonfun$7.apply(SpatialRelationFunctions.scala:37)
org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions$$anonfun$7.apply(SpatialRelationFunctions.scala:37)
org.locationtech.geomesa.spark.jts.util.SQLFunctionHelper$$anonfun$nullableUDF$2.apply(SQLFunctionHelper.scala:33)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:55)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:42)
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(_expression_.scala:423)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:219)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:218)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
Backtrace 3 If envelope computation is a key component of assessing a spatial relation, perhaps caching the entity's envelope is something worth exploring.
com.vividsolutions.jts.geom.impl.CoordinateArraySequence.expandEnvelope(CoordinateArraySequence.java:255)
com.vividsolutions.jts.geom.LineString.computeEnvelopeInternal(LineString.java:232)
com.vividsolutions.jts.geom.Geometry.getEnvelopeInternal(Geometry.java:647)
com.vividsolutions.jts.geom.Polygon.computeEnvelopeInternal(Polygon.java:310)
com.vividsolutions.jts.geom.Geometry.getEnvelopeInternal(Geometry.java:647)
com.vividsolutions.jts.geom.GeometryCollection.computeEnvelopeInternal(GeometryCollection.java:256)
com.vividsolutions.jts.geom.Geometry.getEnvelopeInternal(Geometry.java:647)
com.vividsolutions.jts.geom.Geometry.intersects(Geometry.java:754)
org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions$$anonfun$7.apply(SpatialRelationFunctions.scala:37)
org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions$$anonfun$7.apply(SpatialRelationFunctions.scala:37)
org.locationtech.geomesa.spark.jts.util.SQLFunctionHelper$$anonfun$nullableUDF$2.apply(SQLFunctionHelper.scala:33)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:55)
astraea.spark.rasterframes.expressions.SpatialExpression.nullSafeEval(SpatialExpression.scala:42)
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(_expression_.scala:423)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:219)
org.apache.spark.sql.execution.FilterExec$$anonfun$18$$anonfun$apply$2.apply(basicPhysicalOperators.scala:218)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
|