edit the hBase plugin to supply the requirements

edit the hBase plugin to supply the requirements, and need to be tested
This commit is contained in:
coderfengyun 2014-04-25 13:56:20 +08:00
parent 775b752200
commit 11a94e6288
5 changed files with 265 additions and 102 deletions

View File

@ -1,33 +0,0 @@
package org.bench4q.agent.plugin.basic.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class GenerateDevices {
/**
* @param args
*/
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
try {
System.out.println(conf.get("hbase.zookeeper.quorum"));
@SuppressWarnings({ "unused", "resource" })
HBaseAdmin admin = new HBaseAdmin(conf);
@SuppressWarnings("resource")
HTable table = new HTable(conf, "Devices");
for (int i = 10002, j = 1002; i < 20000; i++, j++) {
Put put = new Put(String.valueOf(i).getBytes());
put.add("INFO".getBytes(), "UserId".getBytes(),
String.valueOf(j).getBytes());
table.put(put);
System.out.println(i + "_" + j);
}
} catch (Exception ex) {
}
}
}

View File

@ -2,40 +2,14 @@ package org.bench4q.agent.plugin.basic.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
public class GenerateRecords {
public class HBaseMesseger {
public String userId = null;
private Configuration conf = HBaseConfiguration.create();
public boolean openAndRes(byte[] type, String deviceId, String logicalName,
String id, String contextName) {
try {
@SuppressWarnings({ "unused", "resource" })
HBaseAdmin admin = new HBaseAdmin(conf);
@SuppressWarnings("resource")
HTable table = new HTable(conf, "Devices");
Get get = new Get(Bytes.toBytes(deviceId));
Result retDev = table.get(get);
if (retDev == null) {
System.out.println("this dev is not register");
return false;
}
userId = Bytes.toString(retDev.getValue("INFO".getBytes(),
"UserId".getBytes()));
} catch (Exception ex) {
System.out.println(ex.getMessage());
return false;
}
return true;
}
public boolean send(double sum, double usage, double U, double I,
String time_userId) {
try {

View File

@ -2,18 +2,35 @@ package org.bench4q.agent.plugin.basic.hbase;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Random;
import java.util.List;
import org.bench4q.agent.plugin.Behavior;
import org.bench4q.agent.plugin.Constructor;
import org.bench4q.agent.plugin.Parameter;
import org.bench4q.agent.plugin.Plugin;
import org.bench4q.agent.utils.Type.SupportTypes;
import org.bench4q.agent.utils.types.Table;
import org.bench4q.agent.utils.types.Table.Row;
@Plugin("Hbase")
public class HBasePlugin {
private RecordGenerator voltageGenerator;
private RecordGenerator electricCurrentGenerator;
private RecordGenerator electricityGenerator;
private HBaseMesseger hBaseMesseger = new HBaseMesseger();
private SimpleDateFormat dateFormat = new SimpleDateFormat(
"yyyyMMddhhmmssS");
List<String> labelList;
{
labelList = new ArrayList<String>();
labelList.add("dataType");
labelList.add("generator");
labelList.add("min");
labelList.add("max");
}
@Constructor
public HBasePlugin(
@ -22,58 +39,111 @@ public class HBasePlugin {
@Parameter(value = "voltage", type = SupportTypes.Table) String voltage,
@Parameter(value = "electricCurrent", type = SupportTypes.Table) String electricCurrent,
@Parameter(value = "electricity", type = SupportTypes.Table) String electricity) {
Row voltageRow = Table.buildTable(voltage, this.labelList).getRow(0);
this.voltageGenerator = RecordGenerator
.buildGeneratorWithRow(voltageRow);
Row electricCurrentRow = Table.buildTable(electricCurrent,
this.labelList).getRow(0);
this.electricCurrentGenerator = RecordGenerator
.buildGeneratorWithRow(electricCurrentRow);
Row electricityRow = Table.buildTable(electricity, this.labelList)
.getRow(0);
this.electricityGenerator = RecordGenerator
.buildGeneratorWithRow(electricityRow);
}
@Behavior("Send")
public ElectReturn send(
@Parameter(value = "beginTime", type = SupportTypes.Field) String beginTime,
@Parameter(value = "endTime", type = SupportTypes.Field) String endTime,
@Parameter(value = "beginTime", type = SupportTypes.Field) String beginDay,
@Parameter(value = "endTime", type = SupportTypes.Field) String endDay,
@Parameter(value = "beginUser", type = SupportTypes.Field) String beginUser,
@Parameter(value = "endUser", type = SupportTypes.Field) String endUser) {
Random r = new Random();
double usage = 0, U = 0, I = 0;
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmmssS");
Date beginDatetime = new Date(0), endDatetime = new Date(0);
try {
beginDatetime = df.parse(beginTime);
endDatetime = df.parse(endTime);
beginDatetime = this.dateFormat.parse(beginDay);
endDatetime = this.dateFormat.parse(endDay);
} catch (ParseException e) {
e.printStackTrace();
return new ElectReturn(0, 0);
}
GregorianCalendar gc = new GregorianCalendar();
gc.setTime(beginDatetime);
int bu = Integer.parseInt(beginUser);
int eu = Integer.parseInt(endUser);
GenerateRecords gr = new GenerateRecords();
double[] sum = new double[eu - bu + 1];
for (int i = 0; i <= eu - bu; i++) {
sum[i] = 0;
int beginUserInt = Integer.parseInt(beginUser);
int endUserInt = Integer.parseInt(endUser);
double[] sumOfEletricity = initSumOfEletricityForEachUser(beginUserInt,
endUserInt);
TwoTupple twoTupple = new TwoTupple(0, 0);
for (Date dayIndex = new Date(beginDatetime.getTime()); dayIndex
.before(endDatetime);) {
for (int userIndex = beginUserInt; userIndex <= endUserInt; userIndex++) {
twoTupple.add(sendForOneUserOneDay(beginUserInt,
sumOfEletricity, dayIndex, userIndex));
}
int succ = 0;
int fail = 0;
for (Date a = new Date(beginDatetime.getTime()); a.before(endDatetime);) {
for (int i = bu; i <= eu; i++) {
usage = r.nextDouble() * 1000;
U = r.nextDouble() * 1000;
I = r.nextDouble() * 1000;
sum[i - bu] += usage;
System.out.println(sum[i - bu] + "-" + usage + "-" + U + "-"
+ I + "-" + df.format(gc.getTime()) + "_"
+ String.valueOf(i));
boolean ret = gr.send(sum[i - bu], usage, U, I,
df.format(gc.getTime()) + "_" + String.valueOf(i));
if (ret)
succ++;
else
fail++;
dayIndex = new Date(dayIndex.getTime() + 24 * 60 * 60 * 1000);
}
return new ElectReturn(twoTupple.successCount, twoTupple.failCount);
}
private double[] initSumOfEletricityForEachUser(int beginUserInt,
int endUserInt) {
double[] sumOfEletricity = new double[endUserInt - beginUserInt + 1];
for (int i = 0; i <= endUserInt - beginUserInt; i++) {
sumOfEletricity[i] = 0;
}
return sumOfEletricity;
}
private TwoTupple sendForOneUserOneDay(int beginUserInt,
double[] sumOfEletricity, Date dayIndex, int userIndex) {
TwoTupple result = new TwoTupple(0, 0);
double electricity = 0, voltage = 0, electricCurrent = 0;
int indexInSum = userIndex - beginUserInt;
Date sendTime = dayIndex;
resetGenerators();
for (int sampleIndex = 0; sampleIndex < 144; sampleIndex++) {
electricity = this.electricityGenerator.getDouble();
voltage = this.voltageGenerator.getDouble();
electricCurrent = this.electricCurrentGenerator.getDouble();
sumOfEletricity[indexInSum] += electricity;
boolean ret = this.hBaseMesseger.send(
sumOfEletricity[indexInSum],
electricity,
voltage,
electricCurrent,
this.dateFormat.format(sendTime) + "_"
+ String.valueOf(userIndex));
result.add(ret);
sendTime = new Date(sendTime.getTime() + 10 * 60 * 1000);
}
return result;
}
private void resetGenerators() {
this.electricCurrentGenerator.reset();
this.electricityGenerator.reset();
this.voltageGenerator.reset();
}
public class TwoTupple {
private int successCount;
private int failCount;
public TwoTupple(int successCount, int failCount) {
this.successCount = successCount;
this.failCount = failCount;
}
public void add(TwoTupple twoTupple) {
this.successCount += twoTupple.successCount;
this.failCount += twoTupple.failCount;
}
public void add(boolean success) {
if (success) {
this.successCount++;
} else {
this.failCount++;
}
gc.add(7, 15);
a = gc.getTime();
}
return new ElectReturn(succ, fail);
}
}

View File

@ -0,0 +1,133 @@
package org.bench4q.agent.plugin.basic.hbase;
import java.util.Random;
import org.bench4q.agent.plugin.TypeConverter;
import org.bench4q.agent.utils.types.Table.Row;
public abstract class RecordGenerator {
protected String dataType;
protected Double minValue;
protected Double maxValue;
private TypeConverter typeConverter = new TypeConverter();
private RecordGenerator(String dataType, String minValue, String maxValue) {
this.dataType = dataType;
this.minValue = Double.valueOf(minValue);
this.maxValue = Double.valueOf(maxValue);
if (this.maxValue < this.minValue) {
throw new RuntimeException("The minValue LT the maxValue");
}
}
public abstract String getValue();
public Double getDouble() {
return Double.valueOf(this.getValue());
}
public abstract void reset();
protected String convertAsDataType(Double value) {
return (String) this.typeConverter.convert(value, this.dataType);
}
/**
* This function build generator as the @generator says, but if it's not
* legal, then build the random generator
*
* @param dataType
* @param generator
* @param min
* @param max
* @return
*/
private static RecordGenerator buildGenerator(String dataType,
String generator, String min, String max) {
if (generator.equalsIgnoreCase("random")) {
return new RandomGenerator(dataType, min, max);
} else if (generator.equalsIgnoreCase("sin")) {
return new SinGenerator(dataType, min, max);
} else if (generator.equalsIgnoreCase("cos")) {
return new CosGenerator(dataType, min, max);
} else {
return new RandomGenerator(dataType, min, max);
}
}
public static RecordGenerator buildGeneratorWithRow(Row row) {
RecordGenerator result = buildGenerator(row.getCell(0), row.getCell(1),
row.getCell(2), row.getCell(3));
result.reset();
return result;
}
public static class RandomGenerator extends RecordGenerator {
private Random random;
private RandomGenerator(String dataType, String minValue,
String maxValue) {
super(dataType, minValue, maxValue);
this.random = new Random();
}
@Override
public String getValue() {
Double value = this.random.nextDouble();
Double doublePart = value - value.longValue();
value = doublePart
+ (value.intValue() % (maxValue.intValue() - minValue
.intValue())) + minValue;
return convertAsDataType(value);
}
@Override
public void reset() {
}
}
public static class SinGenerator extends RecordGenerator {
private Double currentAngle = 0D;
private SinGenerator(String dataType, String minValue, String maxValue) {
super(dataType, minValue, maxValue);
}
@Override
public String getValue() {
Double value = (this.maxValue - this.minValue)
* Math.sin(this.currentAngle)
+ (this.maxValue + this.minValue) / 2;
this.currentAngle += Math.PI / 144;
return convertAsDataType(value);
}
@Override
public void reset() {
this.currentAngle = 0D;
}
}
public static class CosGenerator extends RecordGenerator {
private Double currentAngle = 0D;
private CosGenerator(String dataType, String minValue, String maxValue) {
super(dataType, minValue, maxValue);
}
@Override
public String getValue() {
Double value = (this.maxValue - this.minValue)
* Math.cos(this.currentAngle)
+ (this.maxValue + this.minValue) / 2;
this.currentAngle += Math.PI / 144;
return convertAsDataType(value);
}
@Override
public void reset() {
this.currentAngle = 0D;
}
}
}

View File

@ -1,6 +1,7 @@
package org.bench4q.agent.utils.types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -23,6 +24,17 @@ public class Table extends Type {
public Row(List<String> cells) {
this.setCells(cells);
}
public int cellCount() {
return this.getCells().size();
}
public String getCell(int index) {
if (index >= cellCount()) {
return "";
}
return this.getCells().get(index);
}
}
public static final String ROW_SEPARATOR = "|;";
@ -66,6 +78,13 @@ public class Table extends Type {
this.setColumnLables(new LinkedList<String>());
}
public Row getRow(int index) {
if (index >= this.getRows().size()) {
return new Row(Collections.<String> emptyList());
}
return this.getRows().get(index);
}
public static Table buildTable(String value,
List<String> columnLablesInSequence) {
Table result = new Table();