Hi,
I am trying to follow the Spark Examples (
http://www.geomesa.org/documentation/tutorials/spark.html),
and while I managed to create the fat jar
successfully, I get this error when running the
spark job:
```
NoSuchMethodError:
org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Client.sendBaseOneway
```
(I did some research online which indicated
that there might be some version problem, however,
I seem to be using the correct versions ... )
Local Dev Env:
Linux Fedora 25
Java 8
Hadoop 2.8.0 (pseudo distributed setup)
Accumulo 1.8.1
geomesa-accumulo-dist_2.11-1.3.1
spark-2.1.0-bin-hadoop2.7
I am running Spark in standalone mode (so not via Yarn).
I am not using maven, but sbt. Here is my build.sbt:
```
name := "GeoMesaSparkExample"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.2" % "provided"
, "org.apache.spark" %% "spark-catalyst" % "2.0.2" %
"provided"
, "org.apache.spark" %% "spark-sql" % "2.0.2" % "provided"
/** , "org.apache.spark" %% "spark-yarn" % "2.0.2" % "provided"
**/
, "org.locationtech.geomesa" %% "geomesa-accumulo-datastore" %
"1.3.1"
, "org.locationtech.geomesa" %% "geomesa-accumulo-spark" %
"1.3.1"
, "ch.qos.logback" % "logback-classic" % "1.1.7"
, "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0"
, "org.apache.accumulo" % "accumulo-core" % "1.8.1"
)
assemblyMergeStrategy in assembly := {
case path => {
val strategy = (assemblyMergeStrategy in
assembly).value(path)
if (strategy == MergeStrategy.deduplicate) {
MergeStrategy.first
} else {
strategy
}
}
}
```
CountByDay.scala
```
package examples
import java.text.SimpleDateFormat
import
org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,
SparkContext}
import
org.geotools.data.{DataStoreFinder,
Query}
import
org.geotools.factory.CommonFactoryFinder
import
org.geotools.filter.text.ecql.ECQL
import
org.locationtech.geomesa.accumulo.data.AccumuloDataStore
import
org.locationtech.geomesa.spark.GeoMesaSpark
import
org.opengis.feature.simple.SimpleFeature
import
scala.collection.JavaConversions._
object CountByDay {
val params = Map(
"instanceId" -> "MY_INSTANCE",
// zookeeper info can be found in
ACCUMULO_HOME/conf/accumulo-site.xml
"zookeepers" ->
"localhost:2181",
"user" -> "root",
"password" -> "password",
// no authentication required for
local dev env setup
//"auths" -> "USER,ADMIN",
"tableName" ->
"myNamespace.gdelt"
)
// see
geomesa-tools/conf/sfts/gdelt/reference.conf
val typeName = "gdelt"
val geom = "geom"
val date = "dtg"
val bbox = "-80, 35, -79, 36"
val during =
"2014-01-01T00:00:00.000Z/2014-01-31T12:00:00.000Z"
// val filter = s"bbox($geom, $bbox)
AND $date during $during"
val filter = s"bbox($geom, $bbox)"
def main(args: Array[String]) {
// Get a handle to the data store
val ds =
DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
// Construct a CQL query to filter
by bounding box
val q = new Query(typeName,
ECQL.toFilter(filter))
// Configure Spark
val conf = new
SparkConf().setAppName("testSpark")
val sc =
SparkContext.getOrCreate(conf)
// Get the appropriate spatial RDD
provider
val spatialRDDProvider =
GeoMesaSpark(params)
// Get an RDD[SimpleFeature] from
the spatial RDD provider
val rdd =
spatialRDDProvider.rdd(new
Configuration, sc, params, q)
// Collect the results and print
countByDay(rdd).collect().foreach(println)
println("\n")
ds.dispose()
}
def countByDay(rdd:
RDD[SimpleFeature], dateField: String
= "dtg") = {
val dayAndFeature =
rdd.mapPartitions { iter =>
val df = new
SimpleDateFormat("yyyyMMdd")
val ff =
CommonFactoryFinder.getFilterFactory2
val exp = ff.property(dateField)
iter.map { f =>
(df.format(exp.evaluate(f).asInstanceOf[java.util.Date]),
f) }
}
dayAndFeature.map( x => (x._1,
1)).reduceByKey(_ + _)
}
}
```
I submit the jar like so:
```
spark-submit --master local[4] \
--class examples.CountByDay \
target/scala-2.11/GeoMesaSparkExample-assembly-0.1.jar
```
Can someone please point out what I
am doing wrong here?
Thanks,
Diethard