Hi again,
I am trying the new Java API for Geomesa-Spark provided in
the 1.3.1 version, but I am having some troubles.
First of all, I have tested that everything works fine
querying my Accumulo datastore through the Spark shell using
the geomesa-accumulo-spark-runtime_2.11-1.3.1.jar. This is how
my scala code looks like:
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.locationtech.geomesa.spark._
import org.geotools.data.{DataStoreFinder, Query}
import org.geotools.factory.CommonFactoryFinder
import org.geotools.filter.text.ecql.ECQL
import scala.collection.JavaConversions._
// Accumulo datastore params
val params = Map(
"instanceId" -> "hdp-accumulo-instance",
"user" -> "root",
"password" -> "praxedo",
"tableName" -> "Geoloc_Praxedo"
)
// set the configuration to the existant SparkContext
val conf = sc.getConf
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator",
classOf[GeoMesaSparkKryoRegistrator].getName)
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using Geomesa
functions
val spatialRDDProvider = GeoMesaSpark(params)
val filter = ECQL.toFilter("BBOX(coords, 2.249294,
48.815215, 2.419337, 48.904295)")
val query = new Query("history_1M", filter)
val resultRDD = spatialRDDProvider.rdd(new
Configuration, sc, params, query)
resultRDD.count
This code works fine, giving the expected result.
Now I am trying to do the same thing on Java. This
is how my code looks like:
package com.praxedo.geomesa.geomesa_spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.geotools.data.Query;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.locationtech.geomesa.spark.api.java.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Test {
private
final static String ACCUMULO_INSTANCE =
"hdp-accumulo-instance";
private
final static String ACCUMULO_USER = "root";
private
final static String ACCUMULO_PASSWORD = "password";
private
final static String GEOMESA_CATALOG = "Geoloc_Praxedo";
private
final static String GEOMESA_FEATURE = "history_1M";
public
static void main(String[] args) throws IOException,
CQLException {
//Spark
configuration
SparkConf
conf = new
SparkConf().setAppName("MyAppName").setMaster("local[*]");
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator",
"org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator");
JavaSparkContext
jsc = new JavaSparkContext(conf);
//Datastore
configuration
Map<String,
String> parameters = new HashMap<>();
parameters.put("instanceId",
ACCUMULO_INSTANCE);
parameters.put("zookeepers",
ACCUMULO_ZOOKEEPERS);
parameters.put("user",
ACCUMULO_USER);
parameters.put("password",
ACCUMULO_PASSWORD);
parameters.put("tableName",
GEOMESA_CATALOG);
JavaSpatialRDDProvider
provider = JavaGeoMesaSpark.apply(parameters);
String
predicate = "BBOX(coords, 2.249294, 48.815215, 2.419337,
48.904295)";
Query
query = new Query(GEOMESA_FEATURE,
ECQL.toFilter(predicate));
JavaSpatialRDD
resultRDD = provider.rdd(new Configuration(), jsc,
parameters, query);
System.out.println("Number
of RDDs: " + resultRDD.count());
System.out.println("First
RDD: " + resultRDD.first());
}
}
And here are the dependencies I am importing with
Maven:
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-fate</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-core_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-converter_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-security_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-geotools_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-sql_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-datastore_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-spark_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>16.1</version>
</dependency>
I have succesfully built the jar, but when I launch it on
my cluster I am getting the following error:
Exception in thread "main"
java.util.ServiceConfigurationError:
org.geotools.filter._expression_.PropertyAccessorFactory:
Provider
org.locationtech.geomesa.convert.cql.ArrayPropertyAccessorFactoryorg.locationtech.geomesa.features.kryo.json.JsonPropertyAccessorFactory
not found
at
java.util.ServiceLoader.fail(ServiceLoader.java:239)
at
java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
at
java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at
java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at
org.geotools.filter._expression_.PropertyAccessors.<clinit>(PropertyAccessors.java:51)
at
org.geotools.filter.AttributeExpressionImpl.evaluate(AttributeExpressionImpl.java:213)
at
org.geotools.filter.AttributeExpressionImpl.evaluate(AttributeExpressionImpl.java:189)
at
org.geotools.filter.FilterAttributeExtractor.visit(FilterAttributeExtractor.java:130)
at
org.geotools.filter.AttributeExpressionImpl.accept(AttributeExpressionImpl.java:340)
at
org.geotools.filter.visitor.DefaultFilterVisitor.visit(DefaultFilterVisitor.java:214)
at
org.geotools.filter.spatial.BBOXImpl.accept(BBOXImpl.java:224)
at
org.geotools.data.DataUtilities.propertyNames(DataUtilities.java:413)
at
org.locationtech.geomesa.filter.FilterHelper$.propertyNames(FilterHelper.scala:469)
at
org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor.keep(FilterExtractingVisitor.scala:44)
at
org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor.visit(FilterExtractingVisitor.scala:133)
at
org.geotools.filter.spatial.BBOXImpl.accept(BBOXImpl.java:224)
at
org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor$.apply(FilterExtractingVisitor.scala:28)
at
org.locationtech.geomesa.index.strategies.SpatioTemporalFilterStrategy$class.getFilterStrategy(SpatioTemporalFilterStrategy.scala:37)
at
org.locationtech.geomesa.accumulo.index.Z3Index$.getFilterStrategy(Z3Index.scala:21)
at
org.locationtech.geomesa.index.api.FilterSplitter$$anonfun$5.apply(FilterSplitter.scala:122)
at
org.locationtech.geomesa.index.api.FilterSplitter$$anonfun$5.apply(FilterSplitter.scala:122)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
scala.collection.immutable.List.flatMap(List.scala:344)
at
org.locationtech.geomesa.index.api.FilterSplitter.getQueryOptions(FilterSplitter.scala:104)
at
org.locationtech.geomesa.index.api.StrategyDecider$$anonfun$1.apply(StrategyDecider.scala:52)
at
org.locationtech.geomesa.index.api.StrategyDecider$$anonfun$1.apply(StrategyDecider.scala:52)
at
org.locationtech.geomesa.utils.stats.MethodProfiling$class.profile(MethodProfiling.scala:26)
at
org.locationtech.geomesa.index.api.StrategyDecider.profile(StrategyDecider.scala:18)
at
org.locationtech.geomesa.index.api.StrategyDecider.getFilterPlan(StrategyDecider.scala:52)
at
org.locationtech.geomesa.index.api.QueryPlanner$$anonfun$4.apply(QueryPlanner.scala:135)
at
org.locationtech.geomesa.index.api.QueryPlanner$$anonfun$4.apply(QueryPlanner.scala:114)
at
org.locationtech.geomesa.utils.stats.MethodProfiling$class.profile(MethodProfiling.scala:26)
at
org.locationtech.geomesa.index.api.QueryPlanner.profile(QueryPlanner.scala:43)
at
org.locationtech.geomesa.index.api.QueryPlanner.getQueryPlans(QueryPlanner.scala:114)
at
org.locationtech.geomesa.index.api.QueryPlanner.planQuery(QueryPlanner.scala:61)
at
org.locationtech.geomesa.index.geotools.GeoMesaDataStore.getQueryPlan(GeoMesaDataStore.scala:464)
at
org.locationtech.geomesa.accumulo.data.AccumuloDataStore.getQueryPlan(AccumuloDataStore.scala:108)
at
org.locationtech.geomesa.jobs.accumulo.AccumuloJobUtils$.getMultipleQueryPlan(AccumuloJobUtils.scala:117)
at
org.locationtech.geomesa.spark.accumulo.AccumuloSpatialRDDProvider.rdd(AccumuloSpatialRDDProvider.scala:107)
at
org.locationtech.geomesa.spark.api.java.JavaSpatialRDDProvider.rdd(JavaGeoMesaSpark.scala:37)
at
com.praxedo.geomesa.geomesa_spark.Test.main(Test.java:43)
Maybe a missing dependency? Any idea on this problem?
Thanks for your time.
José.