I am using Spark SQL to execute a query that aggregates points into a LineString. That result needs to be persisted in a separate table. However when I try to save, I get the following error:
In digging into this, is seems the schema comparison is failing because the table schema has a SRID:
I have the same issue even if I remove the SRID from the table schema. Maybe 4326 is assumed? Here is code I am trying to execute:
Dataset<Row> df = spark.read()
.format("geomesa")
.options(datastoreParams.getParams())
.option("geomesa.feature", "echofish")
.load();
df.createOrReplaceTempView("echofish");
String sqlQuery = String.format(
"select first(cruise_name) as cruise_name, makeSimplifiedLineString(timestamp, ping_location) as path from echofish where cruise_name = '%s' and frequency_khz = 18",
cruiseName);
Dataset<Row> result = spark.sql(sqlQuery);
result
.write()
.format("echofish")
.options(datastoreParams.getParams())
.option("geomesa.feature", "echofish_cruise_path")
.save();
While investigating, these parts of the codebase seem to be key to the issue: In org.geotools.feature.type.GeometryTypeImpl.equals(), the comparison fails because the schema has a CRS and the generated Dataframe schema does not:
if (!(other instanceof GeometryType)) {
return false;
}
if (!super.equals(other)) {
return false;
}
GeometryType o = (GeometryType) other;
if (CRS == null) {
return o.getCoordinateReferenceSystem() == null;
}
if (o.getCoordinateReferenceSystem() == null) {
return false;
}
return org.geotools.referencing.CRS.equalsIgnoreMetadata(
CRS, o.getCoordinateReferenceSystem());
In org.locationtech.geomesa.spark.structType2SFT, the CRS is not being set:
def structType2SFT(struct: StructType, name: String): SimpleFeatureType = {
import java.{lang => jl}
val fields = struct.fields
val builder = new SimpleFeatureTypeBuilder
fields.filter( _.name != "__fid__").foreach {
field =>
field.dataType match {
case DataTypes.BooleanType => builder.add(field.name, classOf[jl.Boolean])
case DataTypes.DateType => builder.add(field.name, classOf[java.util.Date])
case DataTypes.FloatType => builder.add(field.name, classOf[jl.Float])
case DataTypes.IntegerType => builder.add(field.name, classOf[jl.Integer])
case DataTypes.DoubleType => builder.add(field.name, classOf[jl.Double])
case DataTypes.StringType => builder.add(field.name, classOf[jl.String])
case DataTypes.LongType => builder.add(field.name, classOf[jl.Long])
case DataTypes.TimestampType => builder.add(field.name, classOf[java.util.Date])
case JTSTypes.PointTypeInstance => builder.add(field.name, classOf[org.locationtech.jts.geom.Point])
case JTSTypes.LineStringTypeInstance => builder.add(field.name, classOf[org.locationtech.jts.geom.LineString])
case JTSTypes.PolygonTypeInstance => builder.add(field.name, classOf[org.locationtech.jts.geom.Polygon])
case JTSTypes.MultipolygonTypeInstance => builder.add(field.name, classOf[org.locationtech.jts.geom.MultiPolygon])
case JTSTypes.GeometryTypeInstance => builder.add(field.name, classOf[org.locationtech.jts.geom.Geometry])
}
}
builder.setName(name)
builder.buildFeatureType()
}
|