I have a case class intended to contain mixed geometries. I would expect to be able to create Points, Lines, Polygons (using st_make*) and map them to a Geometry type. This works (with geom specifically set to be a Point):
import com.vividsolutions.jts.geom._
import org.apache.spark.sql.SparkSession
import org.locationtech.geomesa.spark.jts._
case class Container(lon: Double, lat: Double, geom: Point)
object Test extends App {
val spark = SparkSession.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.withJTS
val df = List((-117.91915, 33.81196))
.toDF("lon", "lat")
.withColumn("geom", st_makePoint('lon, 'lat))
val contained = df.as[Container]
contained.show
}
When changing geom to be a more general Geometry, this fails:
package common
import com.vividsolutions.jts.geom._
import org.apache.spark.sql.SparkSession
import org.locationtech.geomesa.spark.jts._
case class Container(lon: Double, lat: Double, geom: Geometry)
object Test extends App {
val spark = SparkSession.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.withJTS
val df = List((-117.91915, 33.81196))
.toDF("lon", "lat")
.withColumn("geom", st_makePoint('lon, 'lat))
val contained = df.as[Container]
contained.show
}
The Spark/GeoMesa portion of the stack trace is below. Spark's TypedColumn docs suggest that I should be able to change the Column from TypedColumn[Any, Point] (which st_makePoint produces) to TypedColumn[Any, Geometry] by doing this:
st_makePoint('lon, 'lat).as[Geometry]
..but that results in the same error.
st_makePoint('lon, 'lat).cast(GeometryUDT)
similarly fails, but with a different error:
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(UDF(lon, lat) AS STRUCT<`wkb`: BINARY>)' due to data type mismatch: cannot cast point to geometry;;
I was ultimately able to work around the problem by introducing st_castToGeometry, based on the definition for st_castToPoint:
import com.vividsolutions.jts.geom._
import org.apache.spark.sql.{Column, SparkSession, TypedColumn}
import org.locationtech.geomesa.spark.jts._
import org.locationtech.geomesa.spark.jts.util.SQLFunctionHelper.udfToColumn
case class Container(lon: Double, lat: Double, geom: Geometry)
object Test extends App {
val spark = SparkSession.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.withJTS
private val ST_CastToGeometry: Geometry => Geometry = g => g
private val castingNames = Map(
ST_CastToGeometry -> "st_castToGeometry"
)
def st_castToGeometry(geom: Column): TypedColumn[Any, Geometry] =
udfToColumn(ST_CastToGeometry, castingNames, geom)
val df = List((-117.91915, 33.81196))
.toDF("lon", "lat")
.withColumn("geom", st_castToGeometry(st_makePoint('lon, 'lat)))
val contained = df.as[Container]
contained.show
}
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`geom` AS STRUCT<`wkb`: BINARY>)' due to data type mismatch: cannot cast point to geometry;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:257)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$deserializer$lzycompute(Dataset.scala:212)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$deserializer(Dataset.scala:211)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:68)
at org.apache.spark.sql.Dataset.as(Dataset.scala:418)
|