Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] GEOMESA Spark SQL argument type mismatch on string field compare

I've looked at it again this morning, but I think it's not really feasible, without changing large chunks of code to use reflection. The LogicalRelation class has changed in just about every single spark version...

Your best bet at this point may be to just build a custom branch. You'd want to update your pom.xml to set the <spark.version> property to 2.1, and then you'd have to go through and see what code needs to be fixed for it to compile. It looks like the version of LogicalRelation from spark 2.0 is fairly close to the one from 2.1, so your best bet might be to try to copy snippets of code from the geomesa 1.3.x branch, which targets spark 2.0.

Hope that helps,

Emilio

On 4/2/19 3:57 PM, Dave Boyd wrote:

Emilio:

    I would love to test it out.   I tried making the following change based on the code I found in an earlier
branch.

           if (gtFilters.nonEmpty) {
             val relation = gmRel.copy(filt = ff.and(gtFilters :+ gmRel.filt), partitionHints = partitionHints)
-            val newrel = SparkVersions.copy(lr)(output = lr.output, relation = relation)
+//            val newrel = SparkVersions.copy(lr)(output = lr.output, relation = relation)
+            val newrel = lr.copy(expectedOutputAttributes = lr.output, relation = relation)
+

I had to change the spark-catalyst to 2.2 so that timeZoneId is available.
But  I am now getting this error and fixing it is beyond my Scala Foo.

[ERROR] /Users/BigDataDaddy/Documents/workspace/geomesa/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/sql/SQLRules.scala:262: error: type mismatch;
[ERROR]  found   : Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference]
[ERROR]  required: Option[Seq[org.apache.spark.sql.catalyst.expressions.Attribute]]
[ERROR]             val newrel = lr.copy(expectedOutputAttributes = lr.output, relation = relation)
[ERROR]                                       

Any ideas on fixing this?

On 3/31/19 10:18 AM, Emilio Lahr-Vivaz wrote:
Yes, you can change the spark.version property in the main pom.xml[1] to your spark version, and then update the code to call the methods directly.

It would be nice to support spark 2.1 if possible - I can put up a WIP branch that updates the reflection code to account for the option, if you don't mind testing it out.

[1] https://github.com/locationtech/geomesa/blob/master/pom.xml#L96

Thanks,

Emilio


On 3/29/19 4:30 PM, Dave Boyd wrote:

Emilio:

  I think is see the issue although this may be wrong as I am barely literate in scala.  

In looking at the LogicalRelation class I see the following signatures by Spark Release:



2.4, 2.3:
case class LogicalRelation(
    relation: BaseRelation,
    output: Seq[AttributeReference],
    catalogTable: Option[CatalogTable],
    override val isStreaming: Boolean)
  extends LeafNode with MultiInstanceRelation {

2.2:
case class LogicalRelation(
    relation: BaseRelation,
    output: Seq[AttributeReference],
    catalogTable: Option[CatalogTable])
  extends LeafNode with MultiInstanceRelation {

2.1:
case class LogicalRelation(
    relation: BaseRelation,
    expectedOutputAttributes: Option[Seq[Attribute]] = None,
    catalogTable: Option[CatalogTable] = None)
  extends LeafNode with MultiInstanceRelation {

In 2.1 it has 3 parameters like exists in 2.2 but the second parameter
is a Option[Seq[Attribute]] vs Seq[AttributeRef]

In looking at your copy wrapper method it is expecting r.output to be s Seq[AttributeReference]
and passing that to the _copy.

Now we get into types and casting in Scala that is far beyond what I understand.

I am wondering if I can replace the calls to SparkVersion.copy with a straight copy call and remove the fancy
Spark version handling until I can get to a more recent version.

Thanks again for listening.



On 3/29/19 3:01 PM, Emilio Lahr-Vivaz wrote:
If you can figure out the method signature that we should be looking for in 2.1, we can probably add it to the reflection wrapper (as long as it doesn't return a different class or something). You might hit another error later, but maybe not... generally our spark code is tied more closely to the library version than most of our code, due to our hooking into the API at a pretty low level that is not necessarily supported.

Thanks,

Emilio

On 3/29/19 2:21 PM, Dave Boyd wrote:

Emilio:

   Thanks.   I am stuck back on 2.1 for the time being which stinks.  I sortof figured this would be the
case.   Just seemed like a weird thing to have fail, given all the stuff that works, and this is strictly a
parsing/introspection issue.  

    I tried running with Spark 2.3 on the local zeppelin node and it got past the parsing, but then
died trying to submit to the cluster which I expected and confirms what you said.

    Thanks again for all the great work.  I love being able to run quick sql commands - including joins - against
my accumulo tables.

     I see Geomesa 2.3 is released, maybe I will give that a try on a whim.


On 3/29/19 1:30 PM, Emilio Lahr-Vivaz wrote:
Hello,

What version of spark are you using? It looks like the error is coming from our reflection wrapper that tries to smooth out the differences in spark API versions. We've tested mostly with spark 2.4, although it *should* work with spark 2.3 as well - other versions are "unsupported".

Thanks,

Emilio

On 3/28/19 10:54 AM, Dave Boyd wrote:

All:
   I am running geomesa-accumulo-spark-runtime_2.11-2.2.2-SNAPSHOT within a zeppelin notebook.
I am having an interesting problem getting the SQL parsed whenever I try to do an equals compare on
a string field.  Maybe I am just not getting the syntax correct.
I create the dataframe as follows:

val linkagesdataFrame = spark.read.format("geomesa").options(dsParams).option("geomesa.feature", "coalescelinkage").load()
linkagesdataFrame.createOrReplaceTempView("linkageview")
The schema for the data frame is:
root |-- __fid__: string (nullable = false) |-- entity2version: string (nullable = true) |-- linklabel: string (nullable = true) |-- linktype: integer (nullable = true) |-- source: string (nullable = true) |-- datecreated: timestamp (nullable = true) |-- title: string (nullable = true) |-- version: string (nullable = true) |-- entity2name: string (nullable = true) |-- entity2source: string (nullable = true) |-- objectkey: string (nullable = true) |-- lastmodified: timestamp (nullable = true) |-- name: string (nullable = true) |-- entity2key: string (nullable = true) |-- linkstatus: integer (nullable = true)
This SQL statement fails to parse with the below error:
%sql
select * from linkageview where name = "NGrams"
java.lang.IllegalArgumentException: argument type mismatch at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.locationtech.geomesa.spark.SparkVersions$$anonfun$1$$anonfun$apply$2.apply(SparkVersions.scala:34) at org.locationtech.geomesa.spark.SparkVersions$$anonfun$1$$anonfun$apply$2.apply(SparkVersions.scala:34) at org.locationtech.geomesa.spark.SparkVersions$.copy(SparkVersions.scala:48) at org.apache.spark.sql.SQLRules$SpatialOptimizationsRule$$anonfun$apply$1.applyOrElse(SQLRules.scala:261) at org.apache.spark.sql.SQLRules$SpatialOptimizationsRule$$anonfun$apply$1.applyOrElse(SQLRules.scala:221) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277) at org.apache.spark.sql.SQLRules$SpatialOptimizationsRule$.apply(SQLRules.scala:221) at org.apache.spark.sql.SQLRules$SpatialOptimizationsRule$.apply(SQLRules.scala:143) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.zeppelin.spark.SparkZeppelinContext.showData(SparkZeppelinContext.java:108) at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:135) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:632) at org.apache.zeppelin.scheduler.Job.run(Job.java:188) at org.apache.zeppelin.scheduler.ParallelScheduler$JobRunner.run(ParallelScheduler.java:162) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Interestingly the query:
select * from linkageview where name like 'NGrams%'
Works just fine.

This only seems to affect direct equal comparisons of string fields.
I have run a number of other complex queries with aggregations and
even some gnarly joins that work just fine.

Any thoughts?

-- 
========= mailto:dboyd@xxxxxxxxxxxxxxxxx ============
David W. Boyd                     
VP,  Data Solutions       
10432 Balls Ford, Suite 240  
Manassas, VA 20109         
office:   +1-703-552-2862        
cell:     +1-703-402-7908
============== http://www.incadencecorp.com/ ============
ISO/IEC JTC1 WG9, editor ISO/IEC 20547 Big Data Reference Architecture
Chair ANSI/INCITS TC Big Data
Co-chair NIST Big Data Public Working Group Reference Architecture
First Robotic Mentor - FRC, FTC - www.iliterobotics.org
Board Member- USSTEM Foundation - www.usstem.org

The information contained in this message may be privileged 
and/or confidential and protected from disclosure.  
If the reader of this message is not the intended recipient 
or an employee or agent responsible for delivering this message 
to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication 
is strictly prohibited.  If you have received this communication 
in error, please notify the sender immediately by replying to 
this message and deleting the material from any computer.

 

_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users
-- 
========= mailto:dboyd@xxxxxxxxxxxxxxxxx ============
David W. Boyd                     
VP,  Data Solutions       
10432 Balls Ford, Suite 240  
Manassas, VA 20109         
office:   +1-703-552-2862        
cell:     +1-703-402-7908
============== http://www.incadencecorp.com/ ============
ISO/IEC JTC1 WG9, editor ISO/IEC 20547 Big Data Reference Architecture
Chair ANSI/INCITS TC Big Data
Co-chair NIST Big Data Public Working Group Reference Architecture
First Robotic Mentor - FRC, FTC - www.iliterobotics.org
Board Member- USSTEM Foundation - www.usstem.org

The information contained in this message may be privileged 
and/or confidential and protected from disclosure.  
If the reader of this message is not the intended recipient 
or an employee or agent responsible for delivering this message 
to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication 
is strictly prohibited.  If you have received this communication 
in error, please notify the sender immediately by replying to 
this message and deleting the material from any computer.

 

_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users
-- 
========= mailto:dboyd@xxxxxxxxxxxxxxxxx ============
David W. Boyd                     
VP,  Data Solutions       
10432 Balls Ford, Suite 240  
Manassas, VA 20109         
office:   +1-703-552-2862        
cell:     +1-703-402-7908
============== http://www.incadencecorp.com/ ============
ISO/IEC JTC1 WG9, editor ISO/IEC 20547 Big Data Reference Architecture
Chair ANSI/INCITS TC Big Data
Co-chair NIST Big Data Public Working Group Reference Architecture
First Robotic Mentor - FRC, FTC - www.iliterobotics.org
Board Member- USSTEM Foundation - www.usstem.org

The information contained in this message may be privileged 
and/or confidential and protected from disclosure.  
If the reader of this message is not the intended recipient 
or an employee or agent responsible for delivering this message 
to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication 
is strictly prohibited.  If you have received this communication 
in error, please notify the sender immediately by replying to 
this message and deleting the material from any computer.

 

_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.locationtech.org/mailman/listinfo/geomesa-users
-- 
========= mailto:dboyd@xxxxxxxxxxxxxxxxx ============
David W. Boyd                     
VP,  Data Solutions       
10432 Balls Ford, Suite 240  
Manassas, VA 20109         
office:   +1-703-552-2862        
cell:     +1-703-402-7908
============== http://www.incadencecorp.com/ ============
ISO/IEC JTC1 WG9, editor ISO/IEC 20547 Big Data Reference Architecture
Chair ANSI/INCITS TC Big Data
Co-chair NIST Big Data Public Working Group Reference Architecture
First Robotic Mentor - FRC, FTC - www.iliterobotics.org
Board Member- USSTEM Foundation - www.usstem.org

The information contained in this message may be privileged 
and/or confidential and protected from disclosure.  
If the reader of this message is not the intended recipient 
or an employee or agent responsible for delivering this message 
to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication 
is strictly prohibited.  If you have received this communication 
in error, please notify the sender immediately by replying to 
this message and deleting the material from any computer.

 


Back to the top