add the HightAvailable model, and contact with agent by heart beats, and
at the same time, update the HAAgentPool, and the agents's remainLoad
This commit is contained in:
parent
04f060f936
commit
f1a1339418
|
@ -1,6 +1,5 @@
|
|||
package org.bench4q.master.api;
|
||||
|
||||
import java.util.Iterator;
|
||||
import org.bench4q.master.api.model.AgentResponseModel;
|
||||
import org.bench4q.master.communication.AgentStateService;
|
||||
import org.bench4q.master.entity.db.Agent;
|
||||
|
@ -80,24 +79,6 @@ public class AgentController extends BaseController {
|
|||
return _setAgentResponseModel(true, "");
|
||||
}
|
||||
|
||||
public int getLivingAgentCount() {
|
||||
int livingCount = 0;
|
||||
|
||||
if (this.getAgentPoolService().loadAgentPoolFromDB().isEmpty()) {
|
||||
return livingCount;
|
||||
}
|
||||
|
||||
Iterator<Agent> iterator = this.getAgentPoolService()
|
||||
.loadAgentPoolFromDB().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Agent agent = iterator.next();
|
||||
System.out.println(this.getAgentStateService().askLiving(
|
||||
agent.getHostName(), agent.getPort()));
|
||||
livingCount++;
|
||||
}
|
||||
return livingCount;
|
||||
}
|
||||
|
||||
private AgentResponseModel _setAgentResponseModel(boolean isSuccess,
|
||||
String failCauseString) {
|
||||
AgentResponseModel agentResponseModel = new AgentResponseModel();
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
package org.bench4q.master.communication;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Unmarshaller;
|
||||
|
||||
import org.bench4q.master.communication.HttpRequester.HttpResponse;
|
||||
import org.bench4q.master.communication.agent.ServerStatusModel;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -19,17 +25,31 @@ public class AgentStateService {
|
|||
this.httpRequester = httpRequester;
|
||||
}
|
||||
|
||||
public boolean askLiving(String hostName, int port) {
|
||||
public ServerStatusModel askLiving(String hostName, int port) {
|
||||
try {
|
||||
HttpResponse httpResponse = this.getHttpRequester().sendGet(
|
||||
hostName + ":" + port + "/", null, null);
|
||||
if (httpResponse == null || httpResponse.getCode() == 404) {
|
||||
return false;
|
||||
if (httpResponse == null) {
|
||||
return null;
|
||||
}
|
||||
return true;
|
||||
return _extractServerStatusModel(httpResponse.getContent());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
return null;
|
||||
} catch (JAXBException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private ServerStatusModel _extractServerStatusModel(String content)
|
||||
throws JAXBException {
|
||||
ServerStatusModel resultModel = new ServerStatusModel();
|
||||
Unmarshaller unmarshaller = JAXBContext.newInstance(
|
||||
resultModel.getClass()).createUnmarshaller();
|
||||
resultModel = (ServerStatusModel) unmarshaller
|
||||
.unmarshal(new ByteArrayInputStream(content.getBytes()));
|
||||
return resultModel;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package org.bench4q.master.communication.agent;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlElementWrapper;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
@XmlRootElement(name = "serverStatus")
|
||||
public class ServerStatusModel {
|
||||
private List<UUID> runningTests;
|
||||
private List<UUID> finishedTests;
|
||||
|
||||
@XmlElementWrapper(name = "runningTests")
|
||||
@XmlElement(name = "runningTest")
|
||||
public List<UUID> getRunningTests() {
|
||||
return runningTests;
|
||||
}
|
||||
|
||||
public void setRunningTests(List<UUID> runningTests) {
|
||||
this.runningTests = runningTests;
|
||||
}
|
||||
|
||||
@XmlElementWrapper(name = "finishedTests")
|
||||
@XmlElement(name = "finishedTest")
|
||||
public List<UUID> getFinishedTests() {
|
||||
return finishedTests;
|
||||
}
|
||||
|
||||
public void setFinishedTests(List<UUID> finishedTests) {
|
||||
this.finishedTests = finishedTests;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
package org.bench4q.master.service;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import org.bench4q.master.entity.db.Agent;
|
||||
import org.bench4q.master.helper.SessionHelper;
|
||||
|
@ -92,7 +91,6 @@ public class AgentService {
|
|||
}
|
||||
|
||||
public boolean recomputeRemainLoadOfAgent(Agent agent) {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -168,30 +166,26 @@ public class AgentService {
|
|||
}
|
||||
}
|
||||
|
||||
public void resetAgents() {
|
||||
public boolean resetAgent(Agent agentInParam) {
|
||||
Session session = this.getSessionHelper().openSession();
|
||||
Transaction transaction = sessionHelper.openSession()
|
||||
.beginTransaction();
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Agent> agents = session.createCriteria(Agent.class).list();
|
||||
if (agents.size() == 0) {
|
||||
return;
|
||||
}
|
||||
Iterator<Agent> iterator = agents.iterator();
|
||||
Agent agent;
|
||||
while (iterator.hasNext()) {
|
||||
agent = iterator.next();
|
||||
agent.setRemainLoad(agent.getMaxLoad());
|
||||
Agent agentFromDB = (Agent) session
|
||||
.createCriteria(Agent.class)
|
||||
.add(Restrictions.eq("hostName", agentInParam.getHostName()))
|
||||
.uniqueResult();
|
||||
if (agentFromDB == null) {
|
||||
return false;
|
||||
}
|
||||
agentFromDB.setRemainLoad(agentFromDB.getMaxLoad());
|
||||
transaction.commit();
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
transaction.rollback();
|
||||
return false;
|
||||
} finally {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
session.close();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package org.bench4q.master.testPlan;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.bench4q.master.communication.AgentStateService;
|
||||
import org.bench4q.master.entity.db.Agent;
|
||||
|
@ -15,40 +16,74 @@ public class HighAvailableAgentPool {
|
|||
private List<Agent> highAvailablePool;
|
||||
private AgentService agentService;
|
||||
private AgentStateService agentStateService;
|
||||
|
||||
public List<Agent> getHighAvailablePool() {
|
||||
return highAvailablePool;
|
||||
}
|
||||
private TaskQueueContainer taskQueueContainer;
|
||||
|
||||
public void setHighAvailablePool(List<Agent> highAvailablePool) {
|
||||
this.highAvailablePool = highAvailablePool;
|
||||
}
|
||||
|
||||
public AgentService getAgentService() {
|
||||
return agentService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAgentService(AgentService agentService) {
|
||||
this.agentService = agentService;
|
||||
}
|
||||
|
||||
public AgentStateService getAgentStateService() {
|
||||
return agentStateService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAgentStateService(AgentStateService agentStateService) {
|
||||
this.agentStateService = agentStateService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setTaskQueueContainer(TaskQueueContainer taskQueueContainer) {
|
||||
this.taskQueueContainer = taskQueueContainer;
|
||||
}
|
||||
|
||||
public HighAvailableAgentPool() {
|
||||
this.setHighAvailablePool(new ArrayList<Agent>());
|
||||
}
|
||||
|
||||
@Scheduled(cron = "*/5 * * * * *")
|
||||
@Scheduled(cron = "* */1 * * * *")
|
||||
public void timerTask() {
|
||||
_heartBeatsAndUpdateHAPool();
|
||||
System.out.println("hah");
|
||||
}
|
||||
|
||||
private void _heartBeatsAndUpdateHAPool() {
|
||||
this._clearHAPool();
|
||||
List<Agent> agents = this.agentService.loadAgentPoolFromDB();
|
||||
for (int i = 0; i < agents.size(); i++) {
|
||||
if (!_isAlive(agents.get(i))) {
|
||||
_doForBreakDown(agents.get(i));
|
||||
continue;
|
||||
}
|
||||
_arrangeRemainLoad(agents.get(i));
|
||||
this.highAvailablePool.add(agents.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
public boolean _isAlive(Agent agent) {
|
||||
return this.agentStateService.askLiving(agent.getHostName(),
|
||||
agent.getPort()) != null;
|
||||
}
|
||||
|
||||
private void _clearHAPool() {
|
||||
synchronized (this.highAvailablePool) {
|
||||
while (this.highAvailablePool.size() > 0) {
|
||||
this.highAvailablePool
|
||||
.remove(this.highAvailablePool.size() - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void _doForBreakDown(Agent agent) {
|
||||
// TODO:should add some handle to deal with the break down.
|
||||
// at first, we should find its running task, and do it on its
|
||||
// secondary node
|
||||
}
|
||||
|
||||
private void _arrangeRemainLoad(Agent agent) {
|
||||
// TODO: should find from the taskQueue to compute the
|
||||
// really remainLoad of this agent
|
||||
UUID runId = UUID.randomUUID();
|
||||
this.taskQueueContainer.queryHostNameAndLoadFromTestPlanContext(runId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ import javax.xml.bind.Unmarshaller;
|
|||
|
||||
import org.bench4q.master.api.model.RunningScriptModel;
|
||||
import org.bench4q.master.api.model.TestPlanModel;
|
||||
import org.bench4q.master.communication.AgentStateService;
|
||||
import org.bench4q.master.communication.HttpRequester;
|
||||
import org.bench4q.master.communication.HttpRequester.HttpResponse;
|
||||
import org.bench4q.master.communication.agent.RunScenarioModel;
|
||||
|
@ -31,10 +30,10 @@ public class LoadBallancer {
|
|||
|
||||
private ScriptService scriptService;
|
||||
private AgentService agentService;
|
||||
private AgentStateService agentStateService;
|
||||
private HttpRequester httpRequester;
|
||||
private HighAvailableAgentPool highAvailableAgentPool;
|
||||
private TaskQueueContainer taskQueueContainer;
|
||||
private boolean monopolizeByUserSet = true;
|
||||
private TestPlanRunner testPlanRunner;
|
||||
|
||||
@Autowired
|
||||
public void setScriptService(ScriptService scriptService) {
|
||||
|
@ -58,27 +57,20 @@ public class LoadBallancer {
|
|||
this.agentService = agentService;
|
||||
}
|
||||
|
||||
public AgentStateService getAgentStateService() {
|
||||
return agentStateService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAgentStateService(AgentStateService agentStateService) {
|
||||
this.agentStateService = agentStateService;
|
||||
}
|
||||
|
||||
private HttpRequester getHttpRequester() {
|
||||
return httpRequester;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setHttpRequester(HttpRequester httpRequester) {
|
||||
this.httpRequester = httpRequester;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setTestPlanRunner(TestPlanRunner testPlanRunner) {
|
||||
this.testPlanRunner = testPlanRunner;
|
||||
public void setHighAvailableAgentPool(
|
||||
HighAvailableAgentPool highAvailableAgentPool) {
|
||||
this.highAvailableAgentPool = highAvailableAgentPool;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setTaskQueueContainer(TaskQueueContainer taskQueueContainer) {
|
||||
this.taskQueueContainer = taskQueueContainer;
|
||||
}
|
||||
|
||||
public List<RunningScriptModel> generateLoadForTestPlan(
|
||||
|
@ -126,7 +118,7 @@ public class LoadBallancer {
|
|||
for (iterator = this.getAgentService().loadAgentPoolFromDB().iterator(); iterator
|
||||
.hasNext() && requireLoad > 0;) {
|
||||
agent = iterator.next();
|
||||
if (!_isAlive(agent)) {
|
||||
if (!this.highAvailableAgentPool._isAlive(agent)) {
|
||||
continue;
|
||||
}
|
||||
if (_isMonopolized(agent)) {
|
||||
|
@ -154,16 +146,11 @@ public class LoadBallancer {
|
|||
|
||||
private void _addToRunIdHostLoadMap(UUID testPlanId, UUID agentRunId,
|
||||
HostNameAndLoad hostNameAndLoad) {
|
||||
TestPlanContext testPlanContext = this.testPlanRunner.get_tasks().get(
|
||||
testPlanId);
|
||||
TestPlanContext testPlanContext = this.taskQueueContainer
|
||||
.queryTestPlanContext(testPlanId);
|
||||
testPlanContext.addToRunIdHostLoadMap(agentRunId, hostNameAndLoad);
|
||||
}
|
||||
|
||||
private boolean _isAlive(Agent agent) {
|
||||
return this.getAgentStateService().askLiving(agent.getHostName(),
|
||||
agent.getPort());
|
||||
}
|
||||
|
||||
private boolean _isMonopolized(Agent agent) {
|
||||
return agent.getRemainLoad() < agent.getMaxLoad()
|
||||
&& this.isMonopolizeByUserSet();
|
||||
|
@ -196,7 +183,7 @@ public class LoadBallancer {
|
|||
String hostNameString, int port, String scriptContent) {
|
||||
RunScenarioResultModel runScenarioResultModel = new RunScenarioResultModel();
|
||||
try {
|
||||
HttpResponse httpResponse = this.getHttpRequester().sendPostXml(
|
||||
HttpResponse httpResponse = this.httpRequester.sendPostXml(
|
||||
hostNameString + ":" + port + "/test/run", scriptContent,
|
||||
null);
|
||||
runScenarioResultModel = _extractRunSenarioResultModel(httpResponse
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package org.bench4q.master.testPlan;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class TaskQueueContainer {
|
||||
private Map<UUID, TestPlanContext> taskQueue = new HashMap<UUID, TestPlanContext>();
|
||||
|
||||
public Map<UUID, TestPlanContext> getTaskQueue() {
|
||||
return taskQueue;
|
||||
}
|
||||
|
||||
public void setTaskQueue(Map<UUID, TestPlanContext> taskQueue) {
|
||||
this.taskQueue = taskQueue;
|
||||
}
|
||||
|
||||
public TestPlanContext queryTestPlanContext(UUID testPlanId) {
|
||||
return this.taskQueue.get(testPlanId);
|
||||
}
|
||||
|
||||
public TestPlanContext addATask(UUID testPlanId,
|
||||
TestPlanContext testPlanContext) {
|
||||
return this.taskQueue.put(testPlanId, testPlanContext);
|
||||
}
|
||||
|
||||
public HostNameAndLoad queryHostNameAndLoadFromTestPlanContext(
|
||||
UUID agentRunId) {
|
||||
// TODO:get the HostNameAndLoad from taskQueue by agentRunId
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@ package org.bench4q.master.testPlan;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -35,7 +34,7 @@ public class TestPlanRunner {
|
|||
private HttpRequester httpRequester;
|
||||
private LoadBallancer loadBallancer;
|
||||
private static final int PORT = 6565;
|
||||
private Map<UUID, TestPlanContext> _tasks = new HashMap<UUID, TestPlanContext>();
|
||||
private TaskQueueContainer taskQueueContainer;
|
||||
private final int POOL_SIZE = 100;
|
||||
|
||||
@Autowired
|
||||
|
@ -57,8 +56,13 @@ public class TestPlanRunner {
|
|||
this.loadBallancer = loadBallancer;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setTaskQueueContainer(TaskQueueContainer taskQueueContainer) {
|
||||
this.taskQueueContainer = taskQueueContainer;
|
||||
}
|
||||
|
||||
public Map<UUID, TestPlanContext> get_tasks() {
|
||||
return _tasks;
|
||||
return this.taskQueueContainer.getTaskQueue();
|
||||
}
|
||||
|
||||
public AgentBriefModel getBriefStatusFromAgent(UUID testPlanId, UUID runId) {
|
||||
|
@ -77,7 +81,7 @@ public class TestPlanRunner {
|
|||
briefStatusModel = this._extractTestBriefStatusModel(httpResponse
|
||||
.getContent());
|
||||
|
||||
result = this.makeAgentBriefModel(briefStatusModel);
|
||||
result = this._makeAgentBriefModel(briefStatusModel);
|
||||
|
||||
if (result.isFinish()) {
|
||||
this.agentService.backLoadToAgent(
|
||||
|
@ -95,7 +99,8 @@ public class TestPlanRunner {
|
|||
}
|
||||
|
||||
private HostNameAndLoad _queryHostNameAndLoad(UUID testPlanId, UUID runId) {
|
||||
TestPlanContext testPlanContext = this._tasks.get(testPlanId);
|
||||
TestPlanContext testPlanContext = this.taskQueueContainer
|
||||
.queryTestPlanContext(testPlanId);
|
||||
if (testPlanContext == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -114,7 +119,7 @@ public class TestPlanRunner {
|
|||
return resultModel;
|
||||
}
|
||||
|
||||
private AgentBriefModel makeAgentBriefModel(
|
||||
private AgentBriefModel _makeAgentBriefModel(
|
||||
TestBriefStatusModel briefStatusModel) {
|
||||
AgentBriefModel result = new AgentBriefModel();
|
||||
result.setTestStatusModel(briefStatusModel);
|
||||
|
@ -134,7 +139,7 @@ public class TestPlanRunner {
|
|||
testPlanContext.getResultModel().setRunningScriptModels(
|
||||
new ArrayList<RunningScriptModel>());
|
||||
|
||||
this._tasks.put(testPlanId, testPlanContext);
|
||||
this.taskQueueContainer.addATask(testPlanId, testPlanContext);
|
||||
Runnable runnable = new Runnable() {
|
||||
public void run() {
|
||||
testPlanContext.addRunningScriptModelsToResult((_doRunTestPlan(
|
||||
|
@ -150,15 +155,6 @@ public class TestPlanRunner {
|
|||
private List<RunningScriptModel> _doRunTestPlan(
|
||||
TestPlanModel testPlanModel, UUID testPlanId) {
|
||||
List<RunningScriptModel> result = new ArrayList<RunningScriptModel>();
|
||||
|
||||
/*
|
||||
* Iterator<RunningScriptModel> iterator = testPlanModel
|
||||
* .getRunnningScriptModels().iterator(); while (iterator.hasNext()) {
|
||||
* RunningScriptModel sModel = iterator.next();
|
||||
* result.add(this._runTestPlanWithScriptId(sModel.getScriptId(),
|
||||
* sModel.getRequireLoad(), testPlanId)); }
|
||||
*/
|
||||
|
||||
result = this.loadBallancer.generateLoadForTestPlan(testPlanModel,
|
||||
testPlanId);
|
||||
return result;
|
||||
|
|
Loading…
Reference in New Issue