HBase and MongoDB Plugin update
This commit is contained in:
parent
8fd8b0a890
commit
7626aca95f
|
@ -1,89 +1,158 @@
|
||||||
package org.bench4q.agent.plugin.basic.HBasePlugin;
|
package org.bench4q.agent.plugin.basic.HBasePlugin;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
import org.bench4q.agent.plugin.Constructor;
|
|
||||||
import org.bench4q.agent.plugin.Parameter;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.bench4q.agent.plugin.Plugin;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.bench4q.agent.plugin.behavior.Behavior;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.bench4q.agent.plugin.behavior.BehaviorType;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.bench4q.agent.utils.Type.SupportTypes;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.bench4q.share.exception.Bench4QRunTimeException;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.bench4q.agent.plugin.Constructor;
|
||||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
import org.bench4q.agent.plugin.Parameter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.bench4q.agent.plugin.Plugin;
|
||||||
import org.apache.log4j.Logger;
|
import org.bench4q.agent.plugin.behavior.Behavior;
|
||||||
|
import org.bench4q.agent.plugin.behavior.BehaviorType;
|
||||||
@Plugin("HBase")
|
import org.bench4q.agent.utils.Type.SupportTypes;
|
||||||
public class HBasePlugin {
|
import org.bench4q.share.exception.Bench4QRunTimeException;
|
||||||
private final Configuration conf;
|
|
||||||
private final HTable tableUnderTest;
|
@Plugin("HBase")
|
||||||
|
public class HBasePlugin {
|
||||||
@Constructor
|
private final Configuration conf;
|
||||||
public HBasePlugin(
|
private final HTable tableUnderTest;
|
||||||
@Parameter(value = "dnsServer", type = SupportTypes.Field) String dnsServer,
|
|
||||||
@Parameter(value = "zookeeperQuorum", type = SupportTypes.Field) String zookeeperQuorum,
|
@Constructor
|
||||||
@Parameter(value = "zookeeperDataDir", type = SupportTypes.Field) String zookeeperDataDir,
|
public HBasePlugin(
|
||||||
@Parameter(value = "masterAdrress", type = SupportTypes.Field) String masterAdrress,
|
@Parameter(value = "dnsServer", type = SupportTypes.Field) String dnsServer,
|
||||||
@Parameter(value = "tableName", type = SupportTypes.Field) String tableName) {
|
@Parameter(value = "zookeeperQuorum", type = SupportTypes.Field) String zookeeperQuorum,
|
||||||
conf = HBaseConfiguration.create();
|
@Parameter(value = "zookeeperDataDir", type = SupportTypes.Field) String zookeeperDataDir,
|
||||||
try {
|
@Parameter(value = "masterAdrress", type = SupportTypes.Field) String masterAdrress,
|
||||||
Configuration config = new Configuration();
|
@Parameter(value = "tableName", type = SupportTypes.Field) String tableName) {
|
||||||
config.set("hbase.master.dns.nameserver", dnsServer);
|
conf = HBaseConfiguration.create();
|
||||||
config.setBoolean("hbase.cluster.distributed", true);
|
try {
|
||||||
config.set("hbase.zookeeper.quorum", zookeeperQuorum);
|
Configuration config = new Configuration();
|
||||||
config.set("hbase.zookeeper.property.dataDir", zookeeperDataDir);
|
config.set("hbase.master.dns.nameserver", dnsServer);
|
||||||
config.set("hbase.master.info.bindAddress", masterAdrress);
|
config.setBoolean("hbase.cluster.distributed", true);
|
||||||
@SuppressWarnings({ "unused", "resource" })
|
config.set("hbase.zookeeper.quorum", zookeeperQuorum);
|
||||||
HBaseAdmin admin = new HBaseAdmin(config);
|
config.set("hbase.zookeeper.property.dataDir", zookeeperDataDir);
|
||||||
tableUnderTest = new HTable(conf, tableName);
|
config.set("hbase.master.info.bindAddress", masterAdrress);
|
||||||
} catch (Exception e) {
|
@SuppressWarnings({ "unused", "resource" })
|
||||||
e.printStackTrace();
|
HBaseAdmin admin = new HBaseAdmin(config);
|
||||||
throw new Bench4QRunTimeException("Construct HBasePlugin fails!", e);
|
tableUnderTest = new HTable(conf, tableName);
|
||||||
}
|
} catch (Exception e) {
|
||||||
}
|
Logger.getLogger(HBasePlugin.class).error(e, e);
|
||||||
|
throw new Bench4QRunTimeException("Construct HBasePlugin fails!", e);
|
||||||
@Behavior(value = "Insert", type = BehaviorType.USER_BEHAVIOR)
|
}
|
||||||
public HBaseReturn insert(
|
}
|
||||||
@Parameter(value = "key", type = SupportTypes.Field) String key,
|
|
||||||
@Parameter(value = "value", type = SupportTypes.Field) String value) {
|
@Behavior(value = "Insert", type = BehaviorType.USER_BEHAVIOR)
|
||||||
try {
|
public HBaseReturn insert(
|
||||||
Put put = new Put(UUID.randomUUID().toString().getBytes());
|
@Parameter(value = "key", type = SupportTypes.Field) String key,
|
||||||
put.add("key1".getBytes(), "key1".getBytes(), key.getBytes());
|
@Parameter(value = "value", type = SupportTypes.Field) String value) {
|
||||||
put.add("value1".getBytes(), "value1".getBytes(), value.getBytes());
|
try {
|
||||||
this.tableUnderTest.put(put);
|
Put put = new Put(UUID.randomUUID().toString().getBytes());
|
||||||
} catch (Exception ex) {
|
put.add("key1".getBytes(), "key1".getBytes(), key.getBytes());
|
||||||
Logger.getLogger(HBasePlugin.class).info(ex, ex);
|
put.add("value1".getBytes(), "value1".getBytes(), value.getBytes());
|
||||||
return new HBaseReturn(false);
|
this.tableUnderTest.put(put);
|
||||||
}
|
return new HBaseReturn(true);
|
||||||
return new HBaseReturn(true);
|
} catch (Exception ex) {
|
||||||
}
|
Logger.getLogger(HBasePlugin.class).info(ex, ex);
|
||||||
|
return new HBaseReturn(false);
|
||||||
@Behavior(value = "Query", type = BehaviorType.USER_BEHAVIOR)
|
}
|
||||||
public HBaseReturn query(
|
}
|
||||||
@Parameter(value = "key", type = SupportTypes.Field) String key) {
|
|
||||||
try {
|
@Behavior(value = "Query", type = BehaviorType.USER_BEHAVIOR)
|
||||||
Filter filter = new SingleColumnValueFilter(Bytes.toBytes("key1"),
|
public HBaseReturn query(
|
||||||
Bytes.toBytes("key1"), CompareOp.EQUAL, Bytes.toBytes(key)); // µ±ÁÐcolumn1µÄֵΪaaaʱ½øÐвéѯ
|
@Parameter(value = "key", type = SupportTypes.Field) String key) {
|
||||||
Scan s = new Scan();
|
try {
|
||||||
s.setFilter(filter);
|
Filter filter = new SingleColumnValueFilter(Bytes.toBytes("key1"),
|
||||||
ResultScanner resultScanner = this.tableUnderTest.getScanner(s);
|
Bytes.toBytes("key1"), CompareOp.EQUAL, Bytes.toBytes(key));
|
||||||
for (Result r : resultScanner) {
|
Scan s = new Scan();
|
||||||
System.out.println("get the row key:" + new String(r.getRow()));
|
s.setFilter(filter);
|
||||||
}
|
ResultScanner resultScanner = this.tableUnderTest.getScanner(s);
|
||||||
} catch (Exception ex) {
|
for (Result r : resultScanner) {
|
||||||
Logger.getLogger(HBasePlugin.class).info(ex, ex);
|
System.out.println("get the row key:" + new String(r.getRow()));
|
||||||
return new HBaseReturn(false);
|
}
|
||||||
}
|
return new HBaseReturn(true);
|
||||||
return new HBaseReturn(true);
|
} catch (Exception ex) {
|
||||||
}
|
Logger.getLogger(HBasePlugin.class).info(ex, ex);
|
||||||
}
|
return new HBaseReturn(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get style, read according to rowKey
|
||||||
|
* @param rowkey
|
||||||
|
* @throws IOException
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Behavior(value = "Get", type = BehaviorType.USER_BEHAVIOR)
|
||||||
|
public HBaseReturn get(@Parameter(value = "rowkey", type = SupportTypes.Field) String rowkey) {
|
||||||
|
try {
|
||||||
|
Get g = new Get(Bytes.toBytes(rowkey));
|
||||||
|
Result r = tableUnderTest.get(g);
|
||||||
|
for(KeyValue kv : r.raw()){
|
||||||
|
System.out.println("column: " + new String(kv.getRow()));
|
||||||
|
System.out.println("value: " + new String(kv.getValue()));
|
||||||
|
}
|
||||||
|
return new HBaseReturn(true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
Logger.getLogger(HBasePlugin.class).info(ex, ex);
|
||||||
|
return new HBaseReturn(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform a range scan for a set of records in the database.
|
||||||
|
* @param startkey The record key of the first record to read.
|
||||||
|
* @param recordcount The number of records to read
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Behavior(value = "Scan", type = BehaviorType.USER_BEHAVIOR)
|
||||||
|
public HBaseReturn scan(@Parameter(value = "startkey", type = SupportTypes.Field) String startkey,
|
||||||
|
@Parameter(value = "recordcount", type = SupportTypes.Field) String recordcount)
|
||||||
|
{
|
||||||
|
Scan s = new Scan(Bytes.toBytes(startkey));
|
||||||
|
Integer count = Integer.getInteger(recordcount);
|
||||||
|
s.setCaching(count);
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scanner = tableUnderTest.getScanner(s);
|
||||||
|
int numResults = 0;
|
||||||
|
for (Result rr = scanner.next(); rr != null; rr = scanner.next())
|
||||||
|
{
|
||||||
|
|
||||||
|
String key = Bytes.toString(rr.getRow());
|
||||||
|
for (KeyValue kv : rr.raw()) {
|
||||||
|
System.out.println("result: key =" + kv.getQualifier() + " value = " + kv.getValue());
|
||||||
|
}
|
||||||
|
numResults++;
|
||||||
|
if (numResults >= count)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}catch (IOException e) {
|
||||||
|
Logger.getLogger(HBasePlugin.class).info(e, e);
|
||||||
|
return new HBaseReturn(false);
|
||||||
|
}finally {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return new HBaseReturn(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -94,8 +94,7 @@ public class MongoDBPlugin {
|
||||||
|
|
||||||
mongo = new Mongo(hostsList ,
|
mongo = new Mongo(hostsList ,
|
||||||
new MongoOptions(new MongoClientOptions.Builder()
|
new MongoOptions(new MongoClientOptions.Builder()
|
||||||
.maxWaitTime(1000 * 40).build()));
|
.maxWaitTime(1000 * 40).build()));
|
||||||
//mongo = new Mongo(hostName, port);
|
|
||||||
DB db = mongo.getDB(dbName);
|
DB db = mongo.getDB(dbName);
|
||||||
DBCollection table = db.getCollection(this.tableUnderTest);
|
DBCollection table = db.getCollection(this.tableUnderTest);
|
||||||
Table propertiesTable = Table.buildTable(properties,
|
Table propertiesTable = Table.buildTable(properties,
|
||||||
|
|
Loading…
Reference in New Issue