Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[geomesa-users] Geomesa Spark Java API

Hi again,

I am trying the new Java API for Geomesa-Spark provided in the 1.3.1 version, but I am having some troubles.

First of all, I have tested that everything works fine querying my Accumulo datastore through the Spark shell using the geomesa-accumulo-spark-runtime_2.11-1.3.1.jar. This is how my scala code looks like:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.locationtech.geomesa.spark._
import org.geotools.data.{DataStoreFinder, Query}
import org.geotools.factory.CommonFactoryFinder
import org.geotools.filter.text.ecql.ECQL
import scala.collection.JavaConversions._

// Accumulo datastore params
val params = Map(
"instanceId" -> "hdp-accumulo-instance",
"user" -> "root",
"password" -> "praxedo",
"tableName" -> "Geoloc_Praxedo"
)

// set the configuration to the existant SparkContext
val conf = sc.getConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
val sc = SparkContext.getOrCreate(conf)

// create RDD with a geospatial query using Geomesa functions
val spatialRDDProvider = GeoMesaSpark(params)
val filter = ECQL.toFilter("BBOX(coords, 2.249294, 48.815215, 2.419337, 48.904295)")
val query = new Query("history_1M", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, params, query)

resultRDD.count

This code works fine, giving the expected result.
Now I am trying to do the same thing on Java. This is how my code looks like:

package com.praxedo.geomesa.geomesa_spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.geotools.data.Query;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.locationtech.geomesa.spark.api.java.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Test {

private final static String ACCUMULO_INSTANCE = "hdp-accumulo-instance";
private final static String ACCUMULO_ZOOKEEPERS = "hdf-sb-a.praxedo.net:2181,hdf-sb-b.praxedo.net:2181";
private final static String ACCUMULO_USER = "root";
private final static String ACCUMULO_PASSWORD = "password";
private final static String GEOMESA_CATALOG = "Geoloc_Praxedo";
private final static String GEOMESA_FEATURE = "history_1M";
public static void main(String[] args) throws IOException, CQLException {
//Spark configuration
SparkConf conf = new SparkConf().setAppName("MyAppName").setMaster("local[*]");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator");
JavaSparkContext jsc = new JavaSparkContext(conf);
//Datastore configuration
Map<String, String> parameters = new HashMap<>();
parameters.put("instanceId", ACCUMULO_INSTANCE);
parameters.put("zookeepers", ACCUMULO_ZOOKEEPERS);
parameters.put("user", ACCUMULO_USER);
parameters.put("password", ACCUMULO_PASSWORD);
parameters.put("tableName", GEOMESA_CATALOG);
JavaSpatialRDDProvider provider = JavaGeoMesaSpark.apply(parameters);
String predicate = "BBOX(coords, 2.249294, 48.815215, 2.419337, 48.904295)";
Query query = new Query(GEOMESA_FEATURE, ECQL.toFilter(predicate));
JavaSpatialRDD resultRDD = provider.rdd(new Configuration(), jsc, parameters, query);
System.out.println("Number of RDDs: " + resultRDD.count());
System.out.println("First RDD: " + resultRDD.first());
}

}

And here are the dependencies I am importing with Maven:

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-fate</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-core_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-converter_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-security_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-geotools_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-sql_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-datastore_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-spark_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_2.11</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>16.1</version>
</dependency>


I have succesfully built the jar, but when I launch it on my cluster I am getting the following error:

Exception in thread "main" java.util.ServiceConfigurationError: org.geotools.filter._expression_.PropertyAccessorFactory: Provider org.locationtech.geomesa.convert.cql.ArrayPropertyAccessorFactoryorg.locationtech.geomesa.features.kryo.json.JsonPropertyAccessorFactory not found
        at java.util.ServiceLoader.fail(ServiceLoader.java:239)
        at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at org.geotools.filter._expression_.PropertyAccessors.<clinit>(PropertyAccessors.java:51)
        at org.geotools.filter.AttributeExpressionImpl.evaluate(AttributeExpressionImpl.java:213)
        at org.geotools.filter.AttributeExpressionImpl.evaluate(AttributeExpressionImpl.java:189)
        at org.geotools.filter.FilterAttributeExtractor.visit(FilterAttributeExtractor.java:130)
        at org.geotools.filter.AttributeExpressionImpl.accept(AttributeExpressionImpl.java:340)
        at org.geotools.filter.visitor.DefaultFilterVisitor.visit(DefaultFilterVisitor.java:214)
        at org.geotools.filter.spatial.BBOXImpl.accept(BBOXImpl.java:224)
        at org.geotools.data.DataUtilities.propertyNames(DataUtilities.java:413)
        at org.locationtech.geomesa.filter.FilterHelper$.propertyNames(FilterHelper.scala:469)
        at org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor.keep(FilterExtractingVisitor.scala:44)
        at org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor.visit(FilterExtractingVisitor.scala:133)
        at org.geotools.filter.spatial.BBOXImpl.accept(BBOXImpl.java:224)
        at org.locationtech.geomesa.filter.visitor.FilterExtractingVisitor$.apply(FilterExtractingVisitor.scala:28)
        at org.locationtech.geomesa.index.strategies.SpatioTemporalFilterStrategy$class.getFilterStrategy(SpatioTemporalFilterStrategy.scala:37)
        at org.locationtech.geomesa.accumulo.index.Z3Index$.getFilterStrategy(Z3Index.scala:21)
        at org.locationtech.geomesa.index.api.FilterSplitter$$anonfun$5.apply(FilterSplitter.scala:122)
        at org.locationtech.geomesa.index.api.FilterSplitter$$anonfun$5.apply(FilterSplitter.scala:122)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.locationtech.geomesa.index.api.FilterSplitter.org$locationtech$geomesa$index$api$FilterSplitter$$getSimpleQueryOptions(FilterSplitter.scala:122)
        at org.locationtech.geomesa.index.api.FilterSplitter.getQueryOptions(FilterSplitter.scala:104)
        at org.locationtech.geomesa.index.api.StrategyDecider$$anonfun$1.apply(StrategyDecider.scala:52)
        at org.locationtech.geomesa.index.api.StrategyDecider$$anonfun$1.apply(StrategyDecider.scala:52)
        at org.locationtech.geomesa.utils.stats.MethodProfiling$class.profile(MethodProfiling.scala:26)
        at org.locationtech.geomesa.index.api.StrategyDecider.profile(StrategyDecider.scala:18)
        at org.locationtech.geomesa.index.api.StrategyDecider.getFilterPlan(StrategyDecider.scala:52)
        at org.locationtech.geomesa.index.api.QueryPlanner$$anonfun$4.apply(QueryPlanner.scala:135)
        at org.locationtech.geomesa.index.api.QueryPlanner$$anonfun$4.apply(QueryPlanner.scala:114)
        at org.locationtech.geomesa.utils.stats.MethodProfiling$class.profile(MethodProfiling.scala:26)
        at org.locationtech.geomesa.index.api.QueryPlanner.profile(QueryPlanner.scala:43)
        at org.locationtech.geomesa.index.api.QueryPlanner.getQueryPlans(QueryPlanner.scala:114)
        at org.locationtech.geomesa.index.api.QueryPlanner.planQuery(QueryPlanner.scala:61)
        at org.locationtech.geomesa.index.geotools.GeoMesaDataStore.getQueryPlan(GeoMesaDataStore.scala:464)
        at org.locationtech.geomesa.accumulo.data.AccumuloDataStore.getQueryPlan(AccumuloDataStore.scala:108)
        at org.locationtech.geomesa.jobs.accumulo.AccumuloJobUtils$.getMultipleQueryPlan(AccumuloJobUtils.scala:117)
        at org.locationtech.geomesa.spark.accumulo.AccumuloSpatialRDDProvider.rdd(AccumuloSpatialRDDProvider.scala:107)
        at org.locationtech.geomesa.spark.api.java.JavaSpatialRDDProvider.rdd(JavaGeoMesaSpark.scala:37)
        at com.praxedo.geomesa.geomesa_spark.Test.main(Test.java:43)

Maybe a missing dependency? Any idea on this problem?
Thanks for your time.

José.

Back to the top