Sometimes if I update a row with a new LineString and timestamp, I end up with multiple rows with the same FID. This does not happen every time, but is reproducible. Here is some sample code that demonstrates this:
private void doTest() {
GeoMesaFeature geoFeature = GeoMesaFeatureService.getInstance().getFeature(ECHOFISH_CRUISE_PATH).orElseThrow(() -> new RuntimeException("AAAAAARRGGH"));
SparkSession spark = SparkSession.builder()
.appName(APP_NAME)
.config("spark.sql.crossJoin.enabled", "true")
.master("local[*]")
.getOrCreate();
SQLTypes.init(spark.sqlContext());
String id = "231c57df-8d4b-4717-b207-851f43425522";
Timestamp generated = new Timestamp(System.currentTimeMillis());
String cruiseName = "test";
GeometryFactory geometryFactory = new GeometryFactory();
LineString path = geometryFactory.createLineString(
new Coordinate[] {
new Coordinate(
ThreadLocalRandom.current().nextDouble(-90.0, 90.0),
ThreadLocalRandom.current().nextDouble(-90.0, 90.0)
),
new Coordinate(
ThreadLocalRandom.current().nextDouble(-90.0, 90.0),
ThreadLocalRandom.current().nextDouble(-90.0, 90.0)
),
new Coordinate(
ThreadLocalRandom.current().nextDouble(-90.0, 90.0),
ThreadLocalRandom.current().nextDouble(-90.0, 90.0)
)
});
Row row = new GenericRow(new Object[] {
id,
cruiseName,
generated,
path
});
spark.sqlContext().createDataFrame(Arrays.asList(row), geoFeature.getStructType())
.write()
.format(GEOMESA_FEATURE_SERVICE)
.options(datastoreParams.getParams())
.option(GEOMESA_FEATURE, ECHOFISH_CRUISE_PATH)
.save();
spark.read()
.format(GEOMESA_FEATURE_SERVICE)
.options(datastoreParams.getParams())
.option(GEOMESA_FEATURE, ECHOFISH_CRUISE_PATH)
.load()
.filter(col("cruise_name").equalTo("test"))
.collectAsList().stream().forEach(System.out::println);
}
Which after several runs eventually produces something like the following:
I am using the following schema:
public SimpleFeatureType getSimpleFeatureType() {
String attributes = String.join(
",",
"cruise_name:String",
"generated:Timestamp",
"*path:LineString:srid=4326");
SimpleFeatureType schema = SimpleFeatureTypes.createType(getName(), attributes);
schema.getUserData().put("geomesa.fid.uuid", "true");
return schema;
}
public StructType getStructType() {
return DataTypes.createStructType(new StructField[]{
DataTypes.createStructField(FID, DataTypes.StringType, false),
DataTypes.createStructField(cruise_name, DataTypes.StringType, false),
DataTypes.createStructField(generated, DataTypes.TimestampType, false),
DataTypes.createStructField(path, JTSTypes.LineStringTypeInstance(), true)
});
}
|