[
Date Prev][Date Next][
Thread Prev][Thread Next][
Date Index][
Thread Index]
[
List Home]
[geowave-dev] Requesting support with setting up Geowave and HBase and extending Geowave to support storage and querying trajectory data.
|
Hello,
I am working on a project requiring spatio-temporal queries. I am facing the error "java.net.SocketTimeoutException: callTimeout=60000,". I am using the Python wrapper. This is setup on a cluster with 3 nodes where one of the nodes is master for both zookeeper and HBase. I can confirm HBase is working properly since i am able to create tables using shell. I faced the same error with Accumulo.
The Geowave code which i am running is also attached. I am new to the entire field of distributed computing and Geo Spatial queries. I have searched for this error across Gitter and Stack Overflow but none of the solutions mentioned is working.
The Geowave code that I am trying to execute is working on a single machine which has zookeeper, accumulo is setup using fluo-uno.
I require some help with extending Geowave to support trajectory data. Is it possible to extend the linesegment datatype to accommodate points with time dimension. If so any articles on how to get started would be of great help. I need to store and query trajectories and I am drawing a blank on all the changes that have to be made to make Geowave compatible (i.e datatypes, indexes for trajectory data type, queries like intersection of two trajectory or a polygon and trajectory).
It would be great if you could help me with these queries.
from pygw.geotools import SimpleFeatureTypeBuilder
from pygw.geotools import AttributeDescriptor
from pygw.geotools import SimpleFeatureBuilder
from shapely.geometry import Point
from datetime import datetime
from pygw.store.hbase import HBaseOptions
from pygw.store import DataStoreFactory
from pygw.store.rocksdb import RocksDBOptions
from pygw.store.accumulo import AccumuloOptions
from pygw.geotools import FeatureDataAdapter
from pygw.query import VectorQueryBuilder
from pygw.index import SpatialIndexBuilder
from pygw.index import SpatialTemporalIndexBuilder
from pygw.geotools import FeatureDataAdapter
import csv
import pandas as pd
def createBuilderAndInitiateTables():
type_builder = SimpleFeatureTypeBuilder()
# Set the name of the feature type
type_builder.set_name("TaxiDetails")
# Add the attributes
type_builder.add(AttributeDescriptor.integer('vendor_id'))
type_builder.add(AttributeDescriptor.date("pickup_time"))
type_builder.add(AttributeDescriptor.date("drop_off_time"))
type_builder.add(AttributeDescriptor.point("pickup_location"))
type_builder.add(AttributeDescriptor.point("drop_off_location"))
type_builder.add(AttributeDescriptor.integer("passenger_count"))
type_builder.add(AttributeDescriptor.double("trip_distance"))
type_builder.add(AttributeDescriptor.double("fare_amount"))
type_builder.add(AttributeDescriptor.double("total_amount"))
type_builder.add(AttributeDescriptor.integer("payment_type"))
# Build the feature type
taxi_details_type = type_builder.build_feature_type()
print(taxi_details_type)
accumulo_options = AccumuloOptions()
accumulo_options.set_user("root")
accumulo_options.set_password("secret")
accumulo_options.set_geowave_namespace("taxi_accumulo_big")
accumulo_options.set_zookeeper("node12:2181")
accumulo_options.set_instance("video")
hbase_option = HBaseOptions()
hbase_option.set_geowave_namespace("taxi_hbase")
hbase_option.set_zookeeper("node12:2181")
datastore = DataStoreFactory.create_data_store(hbase_option)
print(datastore)
global taxi_adapter
taxi_adapter= FeatureDataAdapter(taxi_details_type)
print(taxi_adapter.feature_type)
spatial_idx = SpatialIndexBuilder().set_name("spatial_idx").create_index()
spatial_temporal_idx = SpatialTemporalIndexBuilder().set_name("spatial_temporal_idx").create_index()
print(spatial_idx.get_index_model)
datastore.add_type(taxi_adapter, spatial_idx, spatial_temporal_idx)
registered_types = datastore.get_types()
print(len(registered_types))
for t in registered_types:
print(t.get_type_name())
registered_indices = datastore.get_indices(taxi_adapter.get_type_name())
for i in registered_indices:
print(i.get_name())
return (taxi_details_type,datastore)
# Create the feature type builder
def buildFeatureBuilder(taxi_details_type,x,y):
feature_builder = SimpleFeatureBuilder(taxi_details_type)
batch=df.loc[x:y]
batch['Lpep_dropoff_datetime'] = batch['tpep_dropoff_datetime'].apply(lambda x:datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p') )
batch['lpep_pickup_datetime'] = batch['tpep_pickup_datetime'].apply(lambda x:datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p') )
print("Read data from CSV and preprocessed")
print("Writing to features")
features = []
for idx, trip in batch.iterrows():
vendor_id = trip['VendorID']
pickup_time = trip['lpep_pickup_datetime']
drop_off_time = trip['Lpep_dropoff_datetime']
pickup_lat = trip['pickup_latitude']
pickup_lon = trip['pickup_longitude']
drop_lat = trip['dropoff_latitude']
drop_lon = trip['dropoff_longitude']
passenger_count = trip['passenger_count']
trip_distance = trip['trip_distance']
feature_builder.set_attr("vendor_id", vendor_id)
feature_builder.set_attr("pickup_time", pickup_time)
feature_builder.set_attr("drop_off_time", drop_off_time)
feature_builder.set_attr("pickup_location", Point(pickup_lon,pickup_lat))
feature_builder.set_attr("drop_off_location", Point(drop_lon,drop_lat))
feature_builder.set_attr("passenger_count", passenger_count)
feature_builder.set_attr("trip_distance", trip_distance)
feature_builder.set_attr("fare_amount", trip['fare_amount'])
feature_builder.set_attr("total_amount", trip['total_amount'])
feature_builder.set_attr("payment_type", trip['payment_type'])
feature = feature_builder.build(str(idx))
features.append(feature)
print("Features array ready ")
return features
def writeDataToDB(features,datastore):
# Create a writer for our data
print("Starting write to db")
writer = datastore.create_writer(taxi_adapter.get_type_name())
print(writer)
#for ft in features:
# writer.write(ft)
writer.close()
print("writing closed")
def writeElement(feature,datastore):
print("Starting write to db")
writer = datastore.create_writer(taxi_adapter.get_type_name())
print(writer)
writer.write(feature)
writer.close()
print("writing closed")
df = pd.read_csv("/hdd/sharathb/data/taxidata.csv",header=0)
taxi_details_type, datastore = createBuilderAndInitiateTables()
features = buildFeatureBuilder(taxi_details_type,0,5)
writeElement(features[0],datastore)
"""
for i in range(0,1):
print("for loop",i)
features = buildFeatureBuilder(taxi_details_type,i*1000,((i+1)*1000)-1)
print("Feature[0]",features[i])
writeElement(features[i],datastore)
#writeDataToDB(features,datastore)
"""
ERROR [operations.HBaseOperations] - Error verifying/adding coprocessor.
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Wed Jan 06 06:46:29 EST 2021, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68441: Call to orion-15/192.168.0.26:16020 failed on connection exception: java.net.ConnectException: Connection refused row 'taxi_hbase_GEOWAVE_METADATA,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=orion-15,16020,1609933201766, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:329)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:242)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:58)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:277)
at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:438)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:312)
at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:639)
at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:409)
at org.apache.hadoop.hbase.client.HBaseAdmin.checkTableExistence(HBaseAdmin.java:1485)
at org.apache.hadoop.hbase.client.HBaseAdmin.isTableEnabled(HBaseAdmin.java:1497)
at org.locationtech.geowave.datastore.hbase.operations.HBaseOperations.enableTable(HBaseOperations.java:391)
at org.locationtech.geowave.datastore.hbase.operations.HBaseOperations.verifyCoprocessor(HBaseOperations.java:725)
at org.locationtech.geowave.datastore.hbase.operations.HBaseOperations.ensureServerSideOperationsObserverAttached(HBaseOperations.java:801)
at org.locationtech.geowave.datastore.hbase.operations.HBaseOperations.createMetadataWriter(HBaseOperations.java:894)
at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.addObject(AbstractGeoWavePersistence.java:161)
at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.addObject(AbstractGeoWavePersistence.java:145)
at org.locationtech.geowave.core.store.metadata.DataStatisticsStoreImpl.incorporateStatistics(DataStatisticsStoreImpl.java:83)
at org.locationtech.geowave.core.store.adapter.statistics.StatsCompositionTool.flush(StatsCompositionTool.java:149)
at org.locationtech.geowave.core.store.adapter.statistics.StatsCompositionTool.close(StatsCompositionTool.java:191)
at org.locationtech.geowave.core.store.callback.IngestCallbackList.close(IngestCallbackList.java:35)
at org.locationtech.geowave.core.store.callback.IngestCallbackList.close(IngestCallbackList.java:35)
at org.locationtech.geowave.core.store.base.BaseIndexWriter.close(BaseIndexWriter.java:96)
at org.locationtech.geowave.core.store.index.writer.IndexCompositeWriter.close(IndexCompositeWriter.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)