From 44e78b58e24c7ad9d1b4888c3943a26f1af13cb1 Mon Sep 17 00:00:00 2001 From: coderfengyun Date: Mon, 3 Mar 2014 11:18:18 +0800 Subject: [PATCH] add some functions, and update the haPool's logic, and do some refactor --- src/main/java/org/bench4q/master/Main.java | 99 ++++++++++--------- .../domain/repository/TestPlanRepository.java | 47 ++++++--- .../master/domain/service/TestPlanEngine.java | 63 ++++++++++-- .../highavailable/HighAvailablePool.java | 66 ++++++++----- .../faultolerence/BriefAgentFault.java | 2 +- .../transaction/script/ScriptLoadBase.java | 6 ++ .../master/config/ServerPort.properties | 1 + .../test/domain/Test_HighAvailable.java | 14 ++- .../test/service/Test_TestPlanEngine.java | 17 ++++ 9 files changed, 220 insertions(+), 95 deletions(-) diff --git a/src/main/java/org/bench4q/master/Main.java b/src/main/java/org/bench4q/master/Main.java index 75c7cc21..6b9f1944 100644 --- a/src/main/java/org/bench4q/master/Main.java +++ b/src/main/java/org/bench4q/master/Main.java @@ -1,44 +1,55 @@ -package org.bench4q.master; - -import java.io.InputStream; -import java.util.Properties; - -import org.apache.log4j.Logger; - -/** - * - * @author coderfengyun - * - */ -public class Main { - private static Logger logger = Logger.getLogger(Main.class); - - public static void main(String[] args) { - try { - MasterServer masterServer = new MasterServer(getPortToServe()); - masterServer.start(); - } catch (Exception e) { - e.printStackTrace(); - return; - } - - } - - private static int getPortToServe() { - int portToUse = 0; - Properties prop = new Properties(); - String configFile = ""; - try { - InputStream inputStream = Main.class.getClassLoader() - .getResourceAsStream( - "org/bench4q/master/config/ServerPort.properties"); - prop.load(inputStream); - portToUse = Integer.parseInt(prop.getProperty("portToServe")); - } catch (Exception e) { - portToUse = 8080; - logger.error("There is no config file for port to serve! where path is " - + configFile); - } - return portToUse; - } -} +package org.bench4q.master; + +import java.io.InputStream; +import java.util.Properties; + +import org.apache.log4j.Logger; + +/** + * + * @author coderfengyun + * + */ +public class Main { + private static Logger logger = Logger.getLogger(Main.class); + public static int MAX_FAIL_TIMES; + public static int MIN_EXECUTE_INTERVAL_IN_SECONDS; + public static int PICK_CYCLE_IN_SECONDS; + + public static void main(String[] args) { + try { + MasterServer masterServer = new MasterServer(getPortToServe()); + masterServer.start(); + } catch (Exception e) { + e.printStackTrace(); + return; + } + + } + + private static int getPortToServe() { + int portToUse = 0; + Properties prop = new Properties(); + String configFile = ""; + try { + InputStream inputStream = Main.class.getClassLoader() + .getResourceAsStream( + "org/bench4q/master/config/ServerPort.properties"); + prop.load(inputStream); + portToUse = Integer.parseInt(prop.getProperty("portToServe")); + MAX_FAIL_TIMES = Integer.parseInt(prop.getProperty("maxFailTime")); + MIN_EXECUTE_INTERVAL_IN_SECONDS = Integer.parseInt(prop + .getProperty("minExcuteIntervalInSeconds")); + PICK_CYCLE_IN_SECONDS = Integer.parseInt(prop + .getProperty("pickTestPlanCycleInSeconds")); + } catch (Exception e) { + portToUse = 8080; + MAX_FAIL_TIMES = 10; + MIN_EXECUTE_INTERVAL_IN_SECONDS = 600; + PICK_CYCLE_IN_SECONDS = 60; + logger.error("There is no config file for port to serve! where path is " + + configFile); + } + return portToUse; + } +} diff --git a/src/main/java/org/bench4q/master/domain/repository/TestPlanRepository.java b/src/main/java/org/bench4q/master/domain/repository/TestPlanRepository.java index f6aef9e4..c435e63b 100644 --- a/src/main/java/org/bench4q/master/domain/repository/TestPlanRepository.java +++ b/src/main/java/org/bench4q/master/domain/repository/TestPlanRepository.java @@ -1,6 +1,7 @@ package org.bench4q.master.domain.repository; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -11,8 +12,11 @@ import org.bench4q.master.domain.entity.User; import org.bench4q.master.domain.factory.TestPlanFactory; import org.bench4q.master.exception.ExceptionLog; import org.bench4q.master.exception.ExceptionUtils.EntityUniqueAlReadyExistException; +import org.hibernate.Criteria; import org.hibernate.Session; import org.hibernate.Transaction; +import org.hibernate.criterion.Criterion; +import org.hibernate.criterion.Order; import org.hibernate.criterion.Restrictions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -98,19 +102,6 @@ public class TestPlanRepository extends AbstractRepositoty { return this.getTestPlanFactory().convertToDomain(result); } - public TestPlan getRunningTestPlanBy(UUID testPlanRunId) { - return this.getRunningTestPlans().get(testPlanRunId); - } - - public void attachRunningTestPlan(TestPlan testPlan) { - this.getRunningTestPlans().put( - UUID.fromString(testPlan.getTestPlanRunId()), testPlan); - } - - public void detachRunningTestPlan(UUID testPlanRunId) { - this.getRunningTestPlans().remove(testPlanRunId); - } - public List loadEntities(User user) { Session session = this.getSessionHelper().openSession(); try { @@ -126,6 +117,24 @@ public class TestPlanRepository extends AbstractRepositoty { } } + @SuppressWarnings("unchecked") + public List loadTestPlansBy(List criterions, + Order order) { + Session session = this.getSessionHelper().openSession(); + try { + Criteria criteria = session.createCriteria(TestPlan.class); + for (Criterion criterion : criterions) { + criteria.add(criterion); + } + return criteria.addOrder(order).list(); + } catch (Exception e) { + logger.error(ExceptionLog.getStackTrace(e)); + return new LinkedList(); + } finally { + releaseSession(session); + } + } + public boolean updateEntity(TestPlan testPlanDB) { if (testPlanDB == null) { return false; @@ -145,4 +154,16 @@ public class TestPlanRepository extends AbstractRepositoty { } } + public TestPlan getRunningTestPlanBy(UUID testPlanRunId) { + return this.getRunningTestPlans().get(testPlanRunId); + } + + public void attachRunningTestPlan(TestPlan testPlan) { + this.getRunningTestPlans().put( + UUID.fromString(testPlan.getTestPlanRunId()), testPlan); + } + + public void detachRunningTestPlan(UUID testPlanRunId) { + this.getRunningTestPlans().remove(testPlanRunId); + } } diff --git a/src/main/java/org/bench4q/master/domain/service/TestPlanEngine.java b/src/main/java/org/bench4q/master/domain/service/TestPlanEngine.java index e0273c03..6e50a3e0 100644 --- a/src/main/java/org/bench4q/master/domain/service/TestPlanEngine.java +++ b/src/main/java/org/bench4q/master/domain/service/TestPlanEngine.java @@ -1,11 +1,17 @@ package org.bench4q.master.domain.service; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; +import org.bench4q.master.Main; import org.bench4q.master.domain.entity.TestPlan; import org.bench4q.master.domain.entity.User; import org.bench4q.master.domain.repository.TestPlanRepository; @@ -14,6 +20,9 @@ import org.bench4q.master.testplan.highavailable.HighAvailablePool; import org.bench4q.master.testplan.schedulscript.TaskCompleteCallback; import org.bench4q.share.enums.master.TestPlanStatus; import org.bench4q.share.models.master.TestPlanModel; +import org.hibernate.criterion.Criterion; +import org.hibernate.criterion.Order; +import org.hibernate.criterion.Restrictions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -23,6 +32,8 @@ public class TestPlanEngine implements TaskCompleteCallback, private TestPlanService testPlanService; private HighAvailablePool haPool; private TestPlanRepository testPlanRepository; + private ScheduledExecutorService scheduleExecutor; + public static Logger logger = Logger.getLogger(TestPlanEngine.class); private TestPlanRepository getTestPlanRepository() { @@ -53,6 +64,32 @@ public class TestPlanEngine implements TaskCompleteCallback, this.getHaPool().setObserver(this); } + private ScheduledExecutorService getScheduleExecutor() { + return scheduleExecutor; + } + + private void setScheduleExecutor(ScheduledExecutorService scheduleExecutor) { + this.scheduleExecutor = scheduleExecutor; + } + + public TestPlanEngine() { + init(); + } + + private void init() { + this.setScheduleExecutor(new ScheduledThreadPoolExecutor(1)); + + this.getScheduleExecutor().schedule(new Runnable() { + public void run() { + TestPlan testPlan = pickATestPlan(); + if (testPlan == null) { + return; + } + doRunTestPlan(UUID.fromString(testPlan.getTestPlanRunId())); + } + }, Main.PICK_CYCLE_IN_SECONDS, TimeUnit.SECONDS); + } + public UUID runWith(final TestPlanModel testPlanModel, User user) { ExecutorService executorService = Executors.newCachedThreadPool(); final UUID testPlanId = UUID.randomUUID(); @@ -73,13 +110,6 @@ public class TestPlanEngine implements TaskCompleteCallback, public void doRunTestPlan(final UUID testPlanId) { TestPlan testPlan = getTestPlanRepository().getTestPlanBy(testPlanId); - // if (!testPlan.run()) { - // commitError(testPlan); - // testPlan = null; - // return; - // } - // commitInRunning(testPlan); - // getTestPlanRepository().attachRunningTestPlan(testPlan); TestPlanStatus returnStatus = testPlan.run(); commitTestPlanStaus(returnStatus, testPlan); } @@ -126,4 +156,23 @@ public class TestPlanEngine implements TaskCompleteCallback, } + public TestPlan pickATestPlan() { + List criterions = new ArrayList(); + Criterion notComplete = Restrictions.not(Restrictions.eq( + "currentStatus", TestPlanStatus.Complete.name())); + criterions.add(notComplete); + Criterion lessThanPoolAvailbleLoad = Restrictions.lt("requireLoad", + this.getHaPool().getCurrentAvailableLoad()); + criterions.add(lessThanPoolAvailbleLoad); + Criterion lessThanMaxFailTimes = Restrictions.lt("failTimes", + Main.MAX_FAIL_TIMES); + criterions.add(lessThanMaxFailTimes); + Criterion intervalGTMin_Interval = Restrictions.lt("lastRunningTime", + new Date(System.currentTimeMillis() + - Main.MIN_EXECUTE_INTERVAL_IN_SECONDS * 1000)); + criterions.add(intervalGTMin_Interval); + List testPlans = this.getTestPlanRepository() + .loadTestPlansBy(criterions, Order.desc("createDateTime")); + return testPlans.size() > 0 ? testPlans.get(0) : null; + } } diff --git a/src/main/java/org/bench4q/master/testplan/highavailable/HighAvailablePool.java b/src/main/java/org/bench4q/master/testplan/highavailable/HighAvailablePool.java index 9da2ca8c..20c9d28e 100644 --- a/src/main/java/org/bench4q/master/testplan/highavailable/HighAvailablePool.java +++ b/src/main/java/org/bench4q/master/testplan/highavailable/HighAvailablePool.java @@ -2,6 +2,7 @@ package org.bench4q.master.testplan.highavailable; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -24,6 +25,7 @@ public class HighAvailablePool extends CurrentLoadSubject { private Map pool; private Map agentStatusOfPreviousBeatMap; private Map agentRunBlotters; + private List agentRunIdShouldBeSubstitute; private long maxAvailableLoad; private long currentAvailableLoad; static Logger logger = Logger.getLogger(HighAvailablePool.class); @@ -79,11 +81,21 @@ public class HighAvailablePool extends CurrentLoadSubject { this.agentRunBlotters = agentRunningInfo; } + private List getAgentRunIdShouldBeSubstitute() { + return agentRunIdShouldBeSubstitute; + } + + private void setAgentRunIdShouldBeSubstitute( + List agentRunIdShouldBeSubstitute) { + this.agentRunIdShouldBeSubstitute = agentRunIdShouldBeSubstitute; + } + public HighAvailablePool() { this.setPool(new HashMap()); this.setAgentRunBlotters(new HashMap()); this.setAgentStatusOfPreviousBeatMap(new HashMap()); this.setCurrentAvailableLoad((long) 0); + this.setAgentRunIdShouldBeSubstitute(new LinkedList()); } @Scheduled(cron = "0,30 */1 * * * *") @@ -92,11 +104,24 @@ public class HighAvailablePool extends CurrentLoadSubject { synchronized (this.getPool()) { heartBeatsAndUpdateHAPool(); calculateHAPoolLoadStatusInMonopolize(); + doSubstituteIfRequired(); } } logger.info("Rotation Over"); } + private void doSubstituteIfRequired() { + for (UUID agentRunId : this.getAgentRunIdShouldBeSubstitute()) { + AgentRunBlotter agentRunBlotter = this.getAgentRunBlotters().get( + agentRunId); + if (agentRunBlotter == null) { + // TODO:Actually this is an error + continue; + } + agentRunBlotter.substituteOnBoard(agentRunId); + } + } + private void calculateHAPoolLoadStatusInMonopolize() { int maxLoad = 0, availableLoad = 0; for (Agent agent : this.getPool().values()) { @@ -122,18 +147,18 @@ public class HighAvailablePool extends CurrentLoadSubject { this.setCurrentAvailableLoad((long) 0); this.setMaxAvailableLoad((long) 0); for (Agent agent : this.agentService.loadAgentPoolFromDB()) { - checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(agent); + checkHeartBeat(agent); } } - public void checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(Agent agent) { - this.getPool().put(agent.getHostName(), agent); + public void checkHeartBeat(Agent agent) { ServerStatusModel model = queryAgentStatus(agent); if (model == null) { doForBreakDown(model, agent); - return; + } else { + doForHealth(model, agent); } - doForHealth(model, agent); + this.getPool().put(agent.getHostName(), agent); } public ServerStatusModel queryAgentStatus(Agent agent) { @@ -142,36 +167,24 @@ public class HighAvailablePool extends CurrentLoadSubject { } private void doForBreakDown(ServerStatusModel statusModel, Agent agent) { - agent.setCurrentStatus(AgentService.AGENT_STATUS_BreakDown); - updateAgentStatus(AgentService.AGENT_STATUS_BreakDown, - agent.getHostName()); + updateAgentStatus(AgentService.AGENT_STATUS_BreakDown, agent); List runIDs = queryUnfinishedTest(statusModel, agent.getHostName()); if (runIDs == null) { return; } - for (UUID agentRunId : runIDs) { - AgentRunBlotter agentRunBlotter = this.getAgentRunBlotters().get( - agentRunId); - if (agentRunBlotter == null) { - // TODO:Actually this is an error - continue; - } - agentRunBlotter.substituteOnBoard(agentRunId); - } + // TODO: I should only add a task to taskList here + this.getAgentRunIdShouldBeSubstitute().addAll(runIDs); } - private void updateAgentStatus(int status, String hostName) { - this.pool.get(hostName).setCurrentStatus(status); - this.agentService.updateAgentStatus(status, hostName); + private void updateAgentStatus(int status, Agent agent) { + agent.setCurrentStatus(status); + // TODO: update this all together to db + this.agentService.updateAgentStatus(status, agent.getHostName()); } private void doForHealth(ServerStatusModel newModel, Agent agent) { arrageUnfinishedTest(newModel, agent); - // this.setCurrentAvailableLoad(this.getCurrentAvailableLoad() - // + (long) agent.getRemainLoad()); - // this.setMaxAvailableLoad(this.getMaxAvailableLoad() - // + (long) agent.getMaxLoad()); this.agentStatusOfPreviousBeatMap.put(agent.getHostName(), newModel); } @@ -179,12 +192,11 @@ public class HighAvailablePool extends CurrentLoadSubject { List agentUnfinishedRunIds = queryUnfinishedTest(newModel, agent.getHostName()); if (agentUnfinishedRunIds == null || agentUnfinishedRunIds.size() == 0) { - updateAgentStatus(AgentService.AGENT_STATUS_Idel, - agent.getHostName()); + updateAgentStatus(AgentService.AGENT_STATUS_Idel, agent); this.agentService.resetAgent(agent); return; } - updateAgentStatus(AgentService.AGENT_STATUS_InRun, agent.getHostName()); + updateAgentStatus(AgentService.AGENT_STATUS_InRun, agent); } private List queryUnfinishedTest(ServerStatusModel newModel, diff --git a/src/main/java/org/bench4q/master/testplan/highavailable/faultolerence/BriefAgentFault.java b/src/main/java/org/bench4q/master/testplan/highavailable/faultolerence/BriefAgentFault.java index a577424b..e116dbc3 100644 --- a/src/main/java/org/bench4q/master/testplan/highavailable/faultolerence/BriefAgentFault.java +++ b/src/main/java/org/bench4q/master/testplan/highavailable/faultolerence/BriefAgentFault.java @@ -48,7 +48,7 @@ public class BriefAgentFault implements FaultTolerance { } synchronized (this.getHaPool().getPool()) { - this.getHaPool().checkHeartBeatAndUpdateMaxLoadAndAvailableLoad(this.getAgent()); + this.getHaPool().checkHeartBeat(this.getAgent()); } } diff --git a/src/main/java/org/bench4q/master/transaction/script/ScriptLoadBase.java b/src/main/java/org/bench4q/master/transaction/script/ScriptLoadBase.java index 88a110cb..798769f6 100644 --- a/src/main/java/org/bench4q/master/transaction/script/ScriptLoadBase.java +++ b/src/main/java/org/bench4q/master/transaction/script/ScriptLoadBase.java @@ -142,6 +142,12 @@ public abstract class ScriptLoadBase implements Transaction { RunScenarioResultModel runScenarioResultModel = new RunScenarioResultModel(); int loadForRunCurrent; + if (totalRequireLoad >= this.getHighAvailableAgentPool() + .getCurrentAvailableLoad()) { + logger.info("currentAvailableLoad not enough for substitute"); + return; + } + for (Agent agent : this.getHighAvailableAgentPool().getPool().values()) { if (allocationFinish(totalRequireLoad)) { break; diff --git a/src/main/resources/org/bench4q/master/config/ServerPort.properties b/src/main/resources/org/bench4q/master/config/ServerPort.properties index baa4f984..3729bdf6 100644 --- a/src/main/resources/org/bench4q/master/config/ServerPort.properties +++ b/src/main/resources/org/bench4q/master/config/ServerPort.properties @@ -1,3 +1,4 @@ portToServe=7979 +pickTestPlanCycleInSeconds=60 maxFailTime=10 minExcuteIntervalInSeconds=600 \ No newline at end of file diff --git a/src/test/java/org/bench4q/master/test/domain/Test_HighAvailable.java b/src/test/java/org/bench4q/master/test/domain/Test_HighAvailable.java index b3503afc..954b0324 100644 --- a/src/test/java/org/bench4q/master/test/domain/Test_HighAvailable.java +++ b/src/test/java/org/bench4q/master/test/domain/Test_HighAvailable.java @@ -22,17 +22,18 @@ public class Test_HighAvailable extends TestBase_MakeUpTestPlan { @Before public void prepare() { - submitATestPlanWithOneScript(); - prepareForTestPlanRunning(); + } @After public void cleanUp() { - cleanUpForTestPlanRunning(); + } @Test public void testSubstituteOnBoard() throws InterruptedException { + submitATestPlanWithOneScript(); + prepareForTestPlanRunning(); assertEquals(Long.valueOf(1000), this.getHaPool().getMaxAvailableLoad()); assertEquals(1000, this.getHaPool().getCurrentAvailableLoad()); @@ -71,5 +72,12 @@ public class Test_HighAvailable extends TestBase_MakeUpTestPlan { TestPlanScript extracSpecifiedScript = testPlanAfterSubstitute .extracSpecifiedScript(getScriptId()); assertEquals(2, extracSpecifiedScript.getRunningAgentsDB().size()); + cleanUpForTestPlanRunning(); + } + + @Test + public void testCurrentAvailableLoad() { + this.getHaPool().timerTask(); + assertNotEquals(0, this.getHaPool().getCurrentAvailableLoad()); } } diff --git a/src/test/java/org/bench4q/master/test/service/Test_TestPlanEngine.java b/src/test/java/org/bench4q/master/test/service/Test_TestPlanEngine.java index 667842e1..cbf42d58 100644 --- a/src/test/java/org/bench4q/master/test/service/Test_TestPlanEngine.java +++ b/src/test/java/org/bench4q/master/test/service/Test_TestPlanEngine.java @@ -2,6 +2,7 @@ package org.bench4q.master.test.service; import java.util.Date; +import org.bench4q.master.Main; import org.bench4q.master.domain.entity.TestPlan; import org.bench4q.master.domain.entity.User; import org.bench4q.master.domain.service.TestPlanEngine; @@ -86,6 +87,22 @@ public class Test_TestPlanEngine extends TestBase_MakeUpTestPlan { deleteTestPlan(); } + @Test + public void testPickATestPlan() { + this.getHaPool().timerTask(); + assertNotEquals(0, this.getHaPool().getCurrentAvailableLoad()); + System.out.println(this.getHaPool().getCurrentAvailableLoad()); + TestPlan testPlanPicked = this.getTestPlanEngine().pickATestPlan(); + assertNotNull(testPlanPicked); + assertTrue(testPlanPicked.getRequiredLoad() <= this.getHaPool() + .getCurrentAvailableLoad()); + assertTrue(testPlanPicked.getFailTimes() <= Main.MAX_FAIL_TIMES); + assertTrue(System.currentTimeMillis() + - testPlanPicked.getLastRunningTime().getTime() > Main.MIN_EXECUTE_INTERVAL_IN_SECONDS * 1000); + assertNotEquals(TestPlanStatus.Complete, + TestPlanStatus.valueOf(testPlanPicked.getCurrentStatus())); + } + private void testForStatus(TestPlanStatus status) throws Exception { this.submitATestPlanWithOneScript(); TestPlan testPlan = this.getTestPlanRepository().getTestPlanBy(