add some functions, and update the haPool's logic, and do some refactor

This commit is contained in:
coderfengyun 2014-03-03 11:18:18 +08:00
parent 65d368016a
commit 44e78b58e2
9 changed files with 220 additions and 95 deletions

View File

@ -12,6 +12,9 @@ import org.apache.log4j.Logger;
*/
public class Main {
private static Logger logger = Logger.getLogger(Main.class);
public static int MAX_FAIL_TIMES;
public static int MIN_EXECUTE_INTERVAL_IN_SECONDS;
public static int PICK_CYCLE_IN_SECONDS;
public static void main(String[] args) {
try {
@ -34,8 +37,16 @@ public class Main {
"org/bench4q/master/config/ServerPort.properties");
prop.load(inputStream);
portToUse = Integer.parseInt(prop.getProperty("portToServe"));
MAX_FAIL_TIMES = Integer.parseInt(prop.getProperty("maxFailTime"));
MIN_EXECUTE_INTERVAL_IN_SECONDS = Integer.parseInt(prop
.getProperty("minExcuteIntervalInSeconds"));
PICK_CYCLE_IN_SECONDS = Integer.parseInt(prop
.getProperty("pickTestPlanCycleInSeconds"));
} catch (Exception e) {
portToUse = 8080;
MAX_FAIL_TIMES = 10;
MIN_EXECUTE_INTERVAL_IN_SECONDS = 600;
PICK_CYCLE_IN_SECONDS = 60;
logger.error("There is no config file for port to serve! where path is "
+ configFile);
}

View File

@ -1,6 +1,7 @@
package org.bench4q.master.domain.repository;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -11,8 +12,11 @@ import org.bench4q.master.domain.entity.User;
import org.bench4q.master.domain.factory.TestPlanFactory;
import org.bench4q.master.exception.ExceptionLog;
import org.bench4q.master.exception.ExceptionUtils.EntityUniqueAlReadyExistException;
import org.hibernate.Criteria;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.criterion.Criterion;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Restrictions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -98,19 +102,6 @@ public class TestPlanRepository extends AbstractRepositoty {
return this.getTestPlanFactory().convertToDomain(result);
}
public TestPlan getRunningTestPlanBy(UUID testPlanRunId) {
return this.getRunningTestPlans().get(testPlanRunId);
}
public void attachRunningTestPlan(TestPlan testPlan) {
this.getRunningTestPlans().put(
UUID.fromString(testPlan.getTestPlanRunId()), testPlan);
}
public void detachRunningTestPlan(UUID testPlanRunId) {
this.getRunningTestPlans().remove(testPlanRunId);
}
public List<TestPlan> loadEntities(User user) {
Session session = this.getSessionHelper().openSession();
try {
@ -126,6 +117,24 @@ public class TestPlanRepository extends AbstractRepositoty {
}
}
@SuppressWarnings("unchecked")
public List<TestPlan> loadTestPlansBy(List<Criterion> criterions,
Order order) {
Session session = this.getSessionHelper().openSession();
try {
Criteria criteria = session.createCriteria(TestPlan.class);
for (Criterion criterion : criterions) {
criteria.add(criterion);
}
return criteria.addOrder(order).list();
} catch (Exception e) {
logger.error(ExceptionLog.getStackTrace(e));
return new LinkedList<TestPlan>();
} finally {
releaseSession(session);
}
}
public boolean updateEntity(TestPlan testPlanDB) {
if (testPlanDB == null) {
return false;
@ -145,4 +154,16 @@ public class TestPlanRepository extends AbstractRepositoty {
}
}
public TestPlan getRunningTestPlanBy(UUID testPlanRunId) {
return this.getRunningTestPlans().get(testPlanRunId);
}
public void attachRunningTestPlan(TestPlan testPlan) {
this.getRunningTestPlans().put(
UUID.fromString(testPlan.getTestPlanRunId()), testPlan);
}
public void detachRunningTestPlan(UUID testPlanRunId) {
this.getRunningTestPlans().remove(testPlanRunId);
}
}

View File

@ -1,11 +1,17 @@
package org.bench4q.master.domain.service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.bench4q.master.Main;
import org.bench4q.master.domain.entity.TestPlan;
import org.bench4q.master.domain.entity.User;
import org.bench4q.master.domain.repository.TestPlanRepository;
@ -14,6 +20,9 @@ import org.bench4q.master.testplan.highavailable.HighAvailablePool;
import org.bench4q.master.testplan.schedulscript.TaskCompleteCallback;
import org.bench4q.share.enums.master.TestPlanStatus;
import org.bench4q.share.models.master.TestPlanModel;
import org.hibernate.criterion.Criterion;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Restrictions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -23,6 +32,8 @@ public class TestPlanEngine implements TaskCompleteCallback,
private TestPlanService testPlanService;
private HighAvailablePool haPool;
private TestPlanRepository testPlanRepository;
private ScheduledExecutorService scheduleExecutor;
public static Logger logger = Logger.getLogger(TestPlanEngine.class);
private TestPlanRepository getTestPlanRepository() {
@ -53,6 +64,32 @@ public class TestPlanEngine implements TaskCompleteCallback,
this.getHaPool().setObserver(this);
}
private ScheduledExecutorService getScheduleExecutor() {
return scheduleExecutor;
}
private void setScheduleExecutor(ScheduledExecutorService scheduleExecutor) {
this.scheduleExecutor = scheduleExecutor;
}
public TestPlanEngine() {
init();
}
private void init() {
this.setScheduleExecutor(new ScheduledThreadPoolExecutor(1));
this.getScheduleExecutor().schedule(new Runnable() {
public void run() {
TestPlan testPlan = pickATestPlan();
if (testPlan == null) {
return;
}
doRunTestPlan(UUID.fromString(testPlan.getTestPlanRunId()));
}
}, Main.PICK_CYCLE_IN_SECONDS, TimeUnit.SECONDS);
}
public UUID runWith(final TestPlanModel testPlanModel, User user) {
ExecutorService executorService = Executors.newCachedThreadPool();
final UUID testPlanId = UUID.randomUUID();
@ -73,13 +110,6 @@ public class TestPlanEngine implements TaskCompleteCallback,
public void doRunTestPlan(final UUID testPlanId) {
TestPlan testPlan = getTestPlanRepository().getTestPlanBy(testPlanId);
// if (!testPlan.run()) {
// commitError(testPlan);
// testPlan = null;
// return;
// }
// commitInRunning(testPlan);
// getTestPlanRepository().attachRunningTestPlan(testPlan);
TestPlanStatus returnStatus = testPlan.run();
commitTestPlanStaus(returnStatus, testPlan);
}
@ -126,4 +156,23 @@ public class TestPlanEngine implements TaskCompleteCallback,
}
public TestPlan pickATestPlan() {
List<Criterion> criterions = new ArrayList<Criterion>();
Criterion notComplete = Restrictions.not(Restrictions.eq(
"currentStatus", TestPlanStatus.Complete.name()));
criterions.add(notComplete);
Criterion lessThanPoolAvailbleLoad = Restrictions.lt("requireLoad",
this.getHaPool().getCurrentAvailableLoad());
criterions.add(lessThanPoolAvailbleLoad);
Criterion lessThanMaxFailTimes = Restrictions.lt("failTimes",
Main.MAX_FAIL_TIMES);
criterions.add(lessThanMaxFailTimes);
Criterion intervalGTMin_Interval = Restrictions.lt("lastRunningTime",
new Date(System.currentTimeMillis()
- Main.MIN_EXECUTE_INTERVAL_IN_SECONDS * 1000));
criterions.add(intervalGTMin_Interval);
List<TestPlan> testPlans = this.getTestPlanRepository()
.loadTestPlansBy(criterions, Order.desc("createDateTime"));
return testPlans.size() > 0 ? testPlans.get(0) : null;
}
}

View File

@ -2,6 +2,7 @@ package org.bench4q.master.testplan.highavailable;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -24,6 +25,7 @@ public class HighAvailablePool extends CurrentLoadSubject {
private Map<String, Agent> pool;
private Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap;
private Map<UUID, AgentRunBlotter> agentRunBlotters;
private List<UUID> agentRunIdShouldBeSubstitute;
private long maxAvailableLoad;
private long currentAvailableLoad;
static Logger logger = Logger.getLogger(HighAvailablePool.class);
@ -79,11 +81,21 @@ public class HighAvailablePool extends CurrentLoadSubject {
this.agentRunBlotters = agentRunningInfo;
}
private List<UUID> getAgentRunIdShouldBeSubstitute() {
return agentRunIdShouldBeSubstitute;
}
private void setAgentRunIdShouldBeSubstitute(
List<UUID> agentRunIdShouldBeSubstitute) {
this.agentRunIdShouldBeSubstitute = agentRunIdShouldBeSubstitute;
}
public HighAvailablePool() {
this.setPool(new HashMap<String, Agent>());
this.setAgentRunBlotters(new HashMap<UUID, AgentRunBlotter>());
this.setAgentStatusOfPreviousBeatMap(new HashMap<String, ServerStatusModel>());
this.setCurrentAvailableLoad((long) 0);
this.setAgentRunIdShouldBeSubstitute(new LinkedList<UUID>());
}
@Scheduled(cron = "0,30 */1 * * * *")
@ -92,11 +104,24 @@ public class HighAvailablePool extends CurrentLoadSubject {
synchronized (this.getPool()) {
heartBeatsAndUpdateHAPool();
calculateHAPoolLoadStatusInMonopolize();
doSubstituteIfRequired();
}
}
logger.info("Rotation Over");
}
private void doSubstituteIfRequired() {
for (UUID agentRunId : this.getAgentRunIdShouldBeSubstitute()) {
AgentRunBlotter agentRunBlotter = this.getAgentRunBlotters().get(
agentRunId);
if (agentRunBlotter == null) {
// TODO:Actually this is an error
continue;
}
agentRunBlotter.substituteOnBoard(agentRunId);
}
}
private void calculateHAPoolLoadStatusInMonopolize() {
int maxLoad = 0, availableLoad = 0;
for (Agent agent : this.getPool().values()) {
@ -122,18 +147,18 @@ public class HighAvailablePool extends CurrentLoadSubject {
this.setCurrentAvailableLoad((long) 0);
this.setMaxAvailableLoad((long) 0);
for (Agent agent : this.agentService.loadAgentPoolFromDB()) {
checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(agent);
checkHeartBeat(agent);
}
}
public void checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(Agent agent) {
this.getPool().put(agent.getHostName(), agent);
public void checkHeartBeat(Agent agent) {
ServerStatusModel model = queryAgentStatus(agent);
if (model == null) {
doForBreakDown(model, agent);
return;
} else {
doForHealth(model, agent);
}
doForHealth(model, agent);
this.getPool().put(agent.getHostName(), agent);
}
public ServerStatusModel queryAgentStatus(Agent agent) {
@ -142,36 +167,24 @@ public class HighAvailablePool extends CurrentLoadSubject {
}
private void doForBreakDown(ServerStatusModel statusModel, Agent agent) {
agent.setCurrentStatus(AgentService.AGENT_STATUS_BreakDown);
updateAgentStatus(AgentService.AGENT_STATUS_BreakDown,
agent.getHostName());
updateAgentStatus(AgentService.AGENT_STATUS_BreakDown, agent);
List<UUID> runIDs = queryUnfinishedTest(statusModel,
agent.getHostName());
if (runIDs == null) {
return;
}
for (UUID agentRunId : runIDs) {
AgentRunBlotter agentRunBlotter = this.getAgentRunBlotters().get(
agentRunId);
if (agentRunBlotter == null) {
// TODO:Actually this is an error
continue;
}
agentRunBlotter.substituteOnBoard(agentRunId);
}
// TODO: I should only add a task to taskList here
this.getAgentRunIdShouldBeSubstitute().addAll(runIDs);
}
private void updateAgentStatus(int status, String hostName) {
this.pool.get(hostName).setCurrentStatus(status);
this.agentService.updateAgentStatus(status, hostName);
private void updateAgentStatus(int status, Agent agent) {
agent.setCurrentStatus(status);
// TODO: update this all together to db
this.agentService.updateAgentStatus(status, agent.getHostName());
}
private void doForHealth(ServerStatusModel newModel, Agent agent) {
arrageUnfinishedTest(newModel, agent);
// this.setCurrentAvailableLoad(this.getCurrentAvailableLoad()
// + (long) agent.getRemainLoad());
// this.setMaxAvailableLoad(this.getMaxAvailableLoad()
// + (long) agent.getMaxLoad());
this.agentStatusOfPreviousBeatMap.put(agent.getHostName(), newModel);
}
@ -179,12 +192,11 @@ public class HighAvailablePool extends CurrentLoadSubject {
List<UUID> agentUnfinishedRunIds = queryUnfinishedTest(newModel,
agent.getHostName());
if (agentUnfinishedRunIds == null || agentUnfinishedRunIds.size() == 0) {
updateAgentStatus(AgentService.AGENT_STATUS_Idel,
agent.getHostName());
updateAgentStatus(AgentService.AGENT_STATUS_Idel, agent);
this.agentService.resetAgent(agent);
return;
}
updateAgentStatus(AgentService.AGENT_STATUS_InRun, agent.getHostName());
updateAgentStatus(AgentService.AGENT_STATUS_InRun, agent);
}
private List<UUID> queryUnfinishedTest(ServerStatusModel newModel,

View File

@ -48,7 +48,7 @@ public class BriefAgentFault implements FaultTolerance {
}
synchronized (this.getHaPool().getPool()) {
this.getHaPool().checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(this.getAgent());
this.getHaPool().checkHeartBeat(this.getAgent());
}
}

View File

@ -142,6 +142,12 @@ public abstract class ScriptLoadBase implements Transaction {
RunScenarioResultModel runScenarioResultModel = new RunScenarioResultModel();
int loadForRunCurrent;
if (totalRequireLoad >= this.getHighAvailableAgentPool()
.getCurrentAvailableLoad()) {
logger.info("currentAvailableLoad not enough for substitute");
return;
}
for (Agent agent : this.getHighAvailableAgentPool().getPool().values()) {
if (allocationFinish(totalRequireLoad)) {
break;

View File

@ -1,3 +1,4 @@
portToServe=7979
pickTestPlanCycleInSeconds=60
maxFailTime=10
minExcuteIntervalInSeconds=600

View File

@ -22,17 +22,18 @@ public class Test_HighAvailable extends TestBase_MakeUpTestPlan {
@Before
public void prepare() {
submitATestPlanWithOneScript();
prepareForTestPlanRunning();
}
@After
public void cleanUp() {
cleanUpForTestPlanRunning();
}
@Test
public void testSubstituteOnBoard() throws InterruptedException {
submitATestPlanWithOneScript();
prepareForTestPlanRunning();
assertEquals(Long.valueOf(1000), this.getHaPool().getMaxAvailableLoad());
assertEquals(1000, this.getHaPool().getCurrentAvailableLoad());
@ -71,5 +72,12 @@ public class Test_HighAvailable extends TestBase_MakeUpTestPlan {
TestPlanScript extracSpecifiedScript = testPlanAfterSubstitute
.extracSpecifiedScript(getScriptId());
assertEquals(2, extracSpecifiedScript.getRunningAgentsDB().size());
cleanUpForTestPlanRunning();
}
@Test
public void testCurrentAvailableLoad() {
this.getHaPool().timerTask();
assertNotEquals(0, this.getHaPool().getCurrentAvailableLoad());
}
}

View File

@ -2,6 +2,7 @@ package org.bench4q.master.test.service;
import java.util.Date;
import org.bench4q.master.Main;
import org.bench4q.master.domain.entity.TestPlan;
import org.bench4q.master.domain.entity.User;
import org.bench4q.master.domain.service.TestPlanEngine;
@ -86,6 +87,22 @@ public class Test_TestPlanEngine extends TestBase_MakeUpTestPlan {
deleteTestPlan();
}
@Test
public void testPickATestPlan() {
this.getHaPool().timerTask();
assertNotEquals(0, this.getHaPool().getCurrentAvailableLoad());
System.out.println(this.getHaPool().getCurrentAvailableLoad());
TestPlan testPlanPicked = this.getTestPlanEngine().pickATestPlan();
assertNotNull(testPlanPicked);
assertTrue(testPlanPicked.getRequiredLoad() <= this.getHaPool()
.getCurrentAvailableLoad());
assertTrue(testPlanPicked.getFailTimes() <= Main.MAX_FAIL_TIMES);
assertTrue(System.currentTimeMillis()
- testPlanPicked.getLastRunningTime().getTime() > Main.MIN_EXECUTE_INTERVAL_IN_SECONDS * 1000);
assertNotEquals(TestPlanStatus.Complete,
TestPlanStatus.valueOf(testPlanPicked.getCurrentStatus()));
}
private void testForStatus(TestPlanStatus status) throws Exception {
this.submitATestPlanWithOneScript();
TestPlan testPlan = this.getTestPlanRepository().getTestPlanBy(