Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Several Problems found in Geomesa-1.0.0.rc2 Query Testing

Hi,
  
my test query client code is GeomesaTest.java in attachments, test_data.csv is the data which ingested into geomesa(50000 records).
   I config the table attr_idx  bloom.enable=true, cache.block.enable=true, and split it to 3 tablets to speed up query.
   when run 500 query client threads and request interval=1s,
the OOM problem more likely to occur.
  
when jvm mem status is ok,  most query latency is under 10 ms .
  
   PS:
accumulo cluster: 4 node, 1 master on a node, 3 tablet servers on other nodes( one tablet server per node). every node: 126G memory, 40 cores x Intel Xeon E5-2670 v2@ 2.50GHZ, 769G hard disk, 1000Mbps network card.
        
  
          


Attachment: test_data.csv
Description: Binary data

package org.geomesa;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureSource;
import org.geotools.data.Query;
import org.geotools.feature.FeatureIterator;
import org.geotools.filter.text.cql2.CQL;
import org.geotools.filter.text.cql2.CQLException;
import org.opengis.feature.Feature;
import org.opengis.filter.Filter;

/**
 * Copyright 2014 Commonwealth Computer Research, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the License); you may not use
 * this file except in compliance with the License. You may obtain a copy of the
 * License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an AS IS BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

public class GeomesaTest {
	static String INSTANCE_ID = "instanceId";
	static String ZOOKEEPERS = "zookeepers";
	static String USER = "user";
	static String PASSWORD = "password";
	static String AUTHS = "auths";
	static String TABLE_NAME = "tableName";
	static String CACHING = "caching";
	static String QUERYTHREADS = "queryThreads";

	// sub-set of parameters that are used to create the Accumulo DataStore
	static String[] ACCUMULO_CONNECTION_PARAMS = new String[] { INSTANCE_ID,
			ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME };

	/**
	 * Creates a common set of command-line options for the parser. Each option
	 * is described separately.
	 */
	@SuppressWarnings("static-access")
	static Options getCommonRequiredOptions() {
		Options options = new Options();

		Option instanceIdOpt = OptionBuilder
				.withArgName(INSTANCE_ID)
				.hasArg()
				.isRequired()
				.withDescription(
						"the ID (name) of the Accumulo instance, e.g:  mycloud")
				.create(INSTANCE_ID);
		options.addOption(instanceIdOpt);

		Option zookeepersOpt = OptionBuilder
				.withArgName(ZOOKEEPERS)
				.hasArg()
				.isRequired()
				.withDescription(
						"the comma-separated list of Zookeeper nodes that support your Accumulo instance, e.g.:  zoo1:2181,zoo2:2181,zoo3:2181")
				.create(ZOOKEEPERS);
		options.addOption(zookeepersOpt);

		Option userOpt = OptionBuilder
				.withArgName(USER)
				.hasArg()
				.isRequired()
				.withDescription(
						"the Accumulo user that will own the connection, e.g.:  root")
				.create(USER);
		options.addOption(userOpt);

		Option passwordOpt = OptionBuilder
				.withArgName(PASSWORD)
				.hasArg()
				.isRequired()
				.withDescription(
						"the password for the Accumulo user that will own the connection, e.g.:  toor")
				.create(PASSWORD);
		options.addOption(passwordOpt);

		Option authsOpt = OptionBuilder
				.withArgName(AUTHS)
				.hasArg()
				.withDescription(
						"the (optional) list of comma-separated Accumulo authorizations that should be applied to all data written or read by this Accumulo user; note that this is NOT the list of low-level database permissions such as 'Table.READ', but more a series of text tokens that decorate cell data, e.g.:  Accounting,Purchasing,Testing")
				.create(AUTHS);
		options.addOption(authsOpt);

		Option tableNameOpt = OptionBuilder
				.withArgName(TABLE_NAME)
				.hasArg()
				.isRequired()
				.withDescription(
						"the name of the Accumulo table to use -- or create, if it does not already exist -- to contain the new data")
				.create(TABLE_NAME);
		options.addOption(tableNameOpt);

		return options;
	}

	static Map<String, String> getAccumuloDataStoreConf(CommandLine cmd) {
		Map<String, String> dsConf = new HashMap<String, String>();
		for (String param : ACCUMULO_CONNECTION_PARAMS) {
			dsConf.put(param, cmd.getOptionValue(param));
		}
		if (dsConf.get(AUTHS) == null)
			dsConf.put(AUTHS, "");
		
		dsConf.put("caching", "false");
		
		return dsConf;
	}

    private int queryFeatures(String simpleFeatureTypeName,
    		DataStore dataStore,
			String attributesQuery) throws CQLException, IOException {

		Filter cqlFilter = CQL.toFilter(attributesQuery);
		String[] props = {"sim"};
		
		Query query = new Query(simpleFeatureTypeName, cqlFilter, props);

		FeatureSource featureSource =  dataStore.getFeatureSource(simpleFeatureTypeName);
		FeatureIterator featureItr =  featureSource.getFeatures(query).features();
		
		int n = 0;
		while (featureItr.hasNext()) {
			Feature feature = featureItr.next();
			++n;
		}
		featureItr.close();
		
		return n;
	}
    
    public GeomesaTest(String simpleFeatureTypeName, Map<String, String> dsConf){
    	List<Runnable> tasks = new ArrayList<Runnable>();
		int clients = 500;
		
		for(int i=0; i<clients; i++){
		   tasks.add(new GeomesaQueryTask(simpleFeatureTypeName, dsConf));
		}
		
		ScheduledThreadPoolExecutor exec = new  ScheduledThreadPoolExecutor(clients); 
		for(Runnable run : tasks){
		   exec.scheduleAtFixedRate(run, 0 ,  1000 , TimeUnit.MILLISECONDS); 
		}
    }

	public static void main(String[] args) throws Exception {
		CommandLineParser parser = new BasicParser();
		Options options = getCommonRequiredOptions();
		CommandLine cmd = parser.parse(options, args);

		Map<String, String> dsConf = getAccumuloDataStoreConf(cmd);
	
		String simpleFeatureTypeName = "sim2route";
		
		GeomesaTest gt = new GeomesaTest(simpleFeatureTypeName, dsConf);
		
	}
	
	static SimpleDateFormat sdf = new SimpleDateFormat("HHmmss SSS");
	static Random random = new Random(50000);
	
	private class GeomesaQueryTask implements Runnable {
		
		private String featureName;
		private DataStore dataStore;

		public GeomesaQueryTask(String simpleFeatureTypeName, Map<String, String> dsConf) {
			this.featureName = simpleFeatureTypeName;
			DataStore dataStore;
			try {
				this.dataStore = DataStoreFinder.getDataStore(dsConf);
			} catch (IOException e) {
				e.printStackTrace();
			}
			
		}

		public void run() {
			
			try {
				Date date = new Date();
				
				long start = System.currentTimeMillis();
				int sim =  random.nextInt(50000);
								
				int num = queryFeatures(featureName, dataStore, "(sim = '"+sim+"')");
				
				long end = System.currentTimeMillis();
				
				System.out.println("["+sdf.format(date) + "] "+ sim + " " + Thread.currentThread().getName()+" read "+ num +" entries, consume " + (end -start) + " ms");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

Back to the top