add supervisor to schedule

add supervisor to schedule, and need to be tested
This commit is contained in:
coderfengyun 2014-09-02 09:34:31 +08:00
parent 3e73f4b4ed
commit b288d92336
6 changed files with 86 additions and 55 deletions

View File

@ -5,6 +5,7 @@ import java.util.Collections;
import java.util.List;
import org.bench4q.agent.scenario.behavior.Behavior;
import org.bench4q.agent.scenario.engine.Schedule;
import org.bench4q.share.helper.MarshalHelper;
import org.bench4q.share.models.agent.ParameterModel;
import org.bench4q.share.models.agent.RunScenarioModel;

View File

@ -14,35 +14,35 @@ import org.bench4q.agent.datacollector.DataCollector;
import org.bench4q.agent.datacollector.impl.ScenarioResultCollector;
import org.bench4q.agent.plugin.PluginManager;
import org.bench4q.agent.scenario.Scenario;
import org.bench4q.agent.scenario.Schedule;
public class ScenarioContext extends Observable implements Observer {
public class ScenarioContext implements Observer {
private static final long keepAliveTime = 10;
private UUID testId;
private Date startDate;
private final UUID testId;
private final Date startDate;
private Date endDate;
private ThreadPoolExecutor executor;
private final ThreadPoolExecutor executor;
private Scenario scenario;
private boolean finished;
private DataCollector dataStatistics;
private PluginManager pluginManager;
private final DataCollector dataCollector;
private final PluginManager pluginManager;
public ScenarioContext(UUID testId, Date startDate,
ThreadPoolExecutor executor, DataCollector dataCollector, PluginManager pluginManager) {
this.testId = testId;
this.startDate = startDate;
this.executor = executor;
this.dataCollector = dataCollector;
this.pluginManager = pluginManager;
}
public UUID getTestId() {
return testId;
}
private void setTestId(UUID testId) {
this.testId = testId;
}
public Date getStartDate() {
return startDate;
}
public void setStartDate(Date saveStartDate) {
this.startDate = saveStartDate;
}
public Date getEndDate() {
return endDate;
}
@ -55,10 +55,6 @@ public class ScenarioContext extends Observable implements Observer {
return executor;
}
public void setExecutorService(ThreadPoolExecutor executor) {
this.executor = executor;
}
public Scenario getScenario() {
return scenario;
}
@ -76,24 +72,13 @@ public class ScenarioContext extends Observable implements Observer {
}
public DataCollector getDataStatistics() {
return dataStatistics;
}
private void setDataStatistics(DataCollector dataStatistics) {
this.dataStatistics = dataStatistics;
return dataCollector;
}
private PluginManager getPluginManager() {
return pluginManager;
}
private void setPluginManager(PluginManager pluginManager) {
this.pluginManager = pluginManager;
}
private ScenarioContext() {
}
public static ScenarioContext buildScenarioContext(UUID testId,
final Scenario scenario, int poolSize, PluginManager pluginManager) {
ScenarioContext scenarioContext = buildScenarioContextWithoutScenario(
@ -105,23 +90,23 @@ public class ScenarioContext extends Observable implements Observer {
public static ScenarioContext buildScenarioContextWithoutScenario(
UUID testId, int poolSize, PluginManager pluginManager) {
ScenarioContext scenarioContext = new ScenarioContext();
scenarioContext.setTestId(testId);
scenarioContext.setPluginManager(pluginManager);
final ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
poolSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize,
poolSize, keepAliveTime, TimeUnit.MINUTES, workQueue,
new DiscardPolicy());
scenarioContext.setStartDate(new Date(System.currentTimeMillis()));
scenarioContext.setExecutorService(executor);
scenarioContext.setDataStatistics(new ScenarioResultCollector(testId));
ScenarioContext scenarioContext = new ScenarioContext(testId, new Date(
System.currentTimeMillis()), executor,
new ScenarioResultCollector(testId), pluginManager);
return scenarioContext;
}
public ScenarioContext addScenrio(Scenario scenario) {
this.setScenario(scenario);
return this;
public ScenarioContext addScenrio(final Scenario scenario) {
ScenarioContext result = new ScenarioContext(this.testId, startDate, executor, this.dataCollector, pluginManager);
result.setEndDate(new Date(scenario.getSchedule().getScheduleRange() + this.getStartDate().getTime()));
result.setFinished(this.isFinished());
result.setScenario(scenario);
return result;
}
/**
@ -156,12 +141,12 @@ public class ScenarioContext extends Observable implements Observer {
Schedule schedule = (Schedule) o;
if (schedule.hasReachEnd()) {
stop();
}else {
} else {
this.updatePopulation((Integer) arg);
}
}
public void stop(){
public void stop() {
this.setFinished(true);
this.setEndDate(new Date());
this.getExecutor().shutdownNow();

View File

@ -2,6 +2,8 @@ package org.bench4q.agent.scenario.engine;
import java.util.HashMap;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.UUID;
import org.apache.log4j.Logger;
@ -11,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ScenarioEngine{
public class ScenarioEngine implements Observer{
private Map<UUID, ScenarioContext> runningTests;
private Logger logger = Logger.getLogger(ScenarioEngine.class);
private PluginManager pluginManager;
@ -50,7 +52,8 @@ public class ScenarioEngine{
public void submitScenario(UUID runId, final Scenario scenario) {
try {
this.getRunningTests().get(runId).addScenrio(scenario);
ScenarioContext old = this.getRunningTests().get(runId);
this.getRunningTests().put(runId, old.addScenrio(scenario));
} catch (Exception e) {
e.printStackTrace();
}
@ -70,6 +73,18 @@ public class ScenarioEngine{
return false;
}
scenarioContext.initTasks();
new Supervisor(scenarioContext).start();;
return true;
}
@Override
public void update(Observable o, Object arg) {
if (!(o instanceof Supervisor)) {
return;
}
UUID testId = (UUID) arg;
this.getRunningTests().remove(testId);
}
}

View File

@ -1,4 +1,4 @@
package org.bench4q.agent.scenario;
package org.bench4q.agent.scenario.engine;
import java.util.Collections;
import java.util.Comparator;
@ -18,6 +18,7 @@ public class Schedule extends Observable {
private static final int SCHEDULE_CYCLE = 3000;
private final List<Segment> segments;
private final long beginTime;
private final Timer timer = new Timer();
private volatile boolean reachEnd;
public List<Segment> getSegments() {
@ -49,12 +50,11 @@ public class Schedule extends Observable {
}
private void beginSchedul() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
long time = System.currentTimeMillis();
Segment segment = getSegment(time);
Segment segment = getSegment(time - beginTime);
if (segment == null) {
// exceed the range of execute, should let the context stop
// the test
@ -66,7 +66,15 @@ public class Schedule extends Observable {
}, 0, SCHEDULE_CYCLE);
}
// get the segment by binary search
void stop(){
this.timer.cancel();
}
/**
* ß
* @param time, is the relative time from begin
* @return get the segment by binary search
*/
public Segment getSegment(long time) {
if (this.getSegments() == null || this.getSegments().size() < 1
|| time < this.getSegments().get(0).start.getTime()) {

View File

@ -1,11 +1,33 @@
package org.bench4q.agent.scenario.engine;
import java.util.Date;
import java.util.Observable;
import java.util.Timer;
import java.util.TimerTask;
public class Supervisor {
private Timer timer;
public class Supervisor extends Observable {
private final ScenarioContext context;
private final Timer timer = new Timer();
public Supervisor(final ScenarioContext context) {
this.context = context;
}
public Supervisor(ScenarioContext context){
timer = new Timer();
void start(){
long time = context.getScenario().getSchedule().getScheduleRange()
+ context.getStartDate().getTime();
this.timer.schedule(new TimerTask() {
@Override
public void run() {
context.getScenario().getSchedule().stop();
}
}, new Date(time));
}
void stop(){
this.context.getScenario().getSchedule().stop();
this.context.stop();
this.timer.cancel();
}
}

View File

@ -4,8 +4,8 @@ import static org.junit.Assert.*;
import java.util.Date;
import org.bench4q.agent.scenario.Schedule;
import org.bench4q.agent.scenario.Schedule.Segment;
import org.bench4q.agent.scenario.engine.Schedule;
import org.bench4q.agent.scenario.engine.Schedule.Segment;
import org.bench4q.share.exception.Bench4QRunTimeException;
import org.bench4q.share.models.agent.scriptrecord.ScheduleModel;
import org.bench4q.share.models.agent.scriptrecord.ScheduleModel.PointModel;