change high avilable pool heartbeat
This commit is contained in:
parent
e8700549c4
commit
e5e0d86cab
|
@ -192,6 +192,7 @@ public class TestPlan implements IAggregate {
|
|||
return null;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
Logger.getLogger(TestPlan.class).info(
|
||||
this.getTestPlanRunId() + " start to run ");
|
||||
|
@ -256,10 +257,6 @@ public class TestPlan implements IAggregate {
|
|||
scheScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
|
||||
public void run() {
|
||||
if (isFinish()) {
|
||||
getTestResultSave().update(TestPlan.this,
|
||||
createMonitorFinishedResult());
|
||||
getTestResultSave().update(TestPlan.this,
|
||||
createScriptFinishedResult());
|
||||
scheScheduledExecutorService.shutdown();
|
||||
return;
|
||||
}
|
||||
|
@ -275,34 +272,6 @@ public class TestPlan implements IAggregate {
|
|||
|
||||
|
||||
|
||||
private List<Object> createMonitorFinishedResult() {
|
||||
List<Object> resultList = new LinkedList<Object>();
|
||||
if (getMonitors() != null) {
|
||||
for (Monitor monitor : getMonitors()) {
|
||||
List<MonitorResult> monitorResults = monitor
|
||||
.createFinishedResult();
|
||||
if (monitorResults != null) {
|
||||
resultList.addAll(monitorResults);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
private List<Object> createScriptFinishedResult() {
|
||||
List<Object> resultList = new LinkedList<Object>();
|
||||
if (getTestPlanScripts() == null) {
|
||||
Logger.getLogger(TestPlan.class).error(
|
||||
"no scripts in testPlan:" + testPlanRunId);
|
||||
|
||||
}
|
||||
for (TestPlanScript testPlanScript : this.getTestPlanScripts()) {
|
||||
resultList.addAll(testPlanScript.createFinishedResult());
|
||||
}
|
||||
return resultList;
|
||||
|
||||
}
|
||||
|
||||
private List<Object> collectResult() {
|
||||
List<Object> resultList = new LinkedList<Object>();
|
||||
|
|
|
@ -18,6 +18,8 @@ import org.bench4q.share.models.master.statistics.ScriptBriefResultModel;
|
|||
@Entity
|
||||
@Table(name = "TestPlanScriptResult")
|
||||
public class TestPlanScriptResult {
|
||||
// private int testPlanId;
|
||||
// private int scriptId,
|
||||
private int id;
|
||||
private TestPlanScript testPlanScript;
|
||||
private String resultContent;
|
||||
|
|
|
@ -49,7 +49,7 @@ public class AgentService {
|
|||
}
|
||||
|
||||
public boolean addAgentToPool(Agent agentWithoutId) {
|
||||
synchronized (this.getAgentRepository().getAddDeleteLock()) {
|
||||
/* synchronized (this.getAgentRepository().getAddDeleteLock()) {*/
|
||||
if (!this.getAgentRepository().attach(
|
||||
Agent.createAgentWithoutId(agentWithoutId))) {
|
||||
return false;
|
||||
|
@ -61,16 +61,17 @@ public class AgentService {
|
|||
}
|
||||
this.getHighAvailablePool().add(agent);
|
||||
return true;
|
||||
}
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
public boolean removeAgentFromPool(int agentId) {
|
||||
// TODO: check if the agent is idle
|
||||
synchronized (this.getAgentRepository().getAddDeleteLock()) {
|
||||
// synchronized (this.getAgentRepository().getAddDeleteLock()) {
|
||||
this.getHighAvailablePool().remove(
|
||||
getAgentRepository().getEntity(agentId));
|
||||
return this.getAgentRepository().detach(agentId);
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
public List<Agent> loadAgentPoolFromDB() {
|
||||
|
|
|
@ -1,324 +1,374 @@
|
|||
package org.bench4q.master.infrastructure.highavailable.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.bench4q.master.domain.RunningAgentInterface;
|
||||
import org.bench4q.master.domain.RunningScriptInterface;
|
||||
import org.bench4q.master.domain.entity.Agent;
|
||||
import org.bench4q.master.domain.factory.RunningAgentFactory;
|
||||
import org.bench4q.master.domain.repository.AgentRepository;
|
||||
import org.bench4q.master.infrastructure.communication.AgentMessenger;
|
||||
import org.bench4q.master.infrastructure.highavailable.CurrentLoadSubject;
|
||||
import org.bench4q.master.infrastructure.highavailable.HighAvailablePool;
|
||||
import org.bench4q.share.enums.master.AgentStatus;
|
||||
import org.bench4q.share.models.agent.RunScenarioResultModel;
|
||||
import org.bench4q.share.models.agent.ServerStatusModel;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class HighAvailablePoolImpl extends CurrentLoadSubject implements
|
||||
HighAvailablePool {
|
||||
private AgentRepository agentRepository;
|
||||
private AgentMessenger agentMessenger;
|
||||
private RunningAgentFactory runningAgentFactory;
|
||||
private Map<String, Agent> pool;
|
||||
private Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap;
|
||||
private Map<UUID, RunningAgentInterface> agentRunBlotters;
|
||||
private List<UUID> agentRunIdShouldBeSubstitute;
|
||||
private long maxAvailableLoad;
|
||||
private int currentAvailableLoad;
|
||||
static Logger logger = Logger.getLogger(HighAvailablePoolImpl.class);
|
||||
|
||||
public Map<String, Agent> getPool() {
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
private void setPool(Map<String, Agent> pool) {
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
private AgentRepository getAgentRepository() {
|
||||
return agentRepository;
|
||||
}
|
||||
|
||||
private void setAgentRepository(AgentRepository agentRepository) {
|
||||
this.agentRepository = agentRepository;
|
||||
}
|
||||
|
||||
private AgentMessenger getAgentMessenger() {
|
||||
return agentMessenger;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private void setAgentMessenger(AgentMessenger agentMessenger) {
|
||||
this.agentMessenger = agentMessenger;
|
||||
}
|
||||
|
||||
private RunningAgentFactory getRunningAgentFactory() {
|
||||
return runningAgentFactory;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private void setRunningAgentFactory(RunningAgentFactory runningAgentFactory) {
|
||||
this.runningAgentFactory = runningAgentFactory;
|
||||
}
|
||||
|
||||
public Long getMaxAvailableLoad() {
|
||||
this.calculateHAPoolLoadStatusInMonopolize();
|
||||
return maxAvailableLoad;
|
||||
}
|
||||
|
||||
private void setMaxAvailableLoad(Long maxAvailableLoad) {
|
||||
this.maxAvailableLoad = maxAvailableLoad;
|
||||
}
|
||||
|
||||
public synchronized void add(Agent agent) {
|
||||
this.getPool().put(agent.getHostName(), agent);
|
||||
}
|
||||
|
||||
public synchronized void remove(Agent agent) {
|
||||
this.getPool().put(agent.getHostName(), agent);
|
||||
}
|
||||
|
||||
public int getCurrentAvailableLoad() {
|
||||
this.calculateHAPoolLoadStatusInMonopolize();
|
||||
return currentAvailableLoad;
|
||||
}
|
||||
|
||||
private void setCurrentAvailableLoad(int currentAvailableLoad) {
|
||||
this.currentAvailableLoad = currentAvailableLoad;
|
||||
}
|
||||
|
||||
private void setAgentStatusOfPreviousBeatMap(
|
||||
Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap) {
|
||||
this.agentStatusOfPreviousBeatMap = agentStatusOfPreviousBeatMap;
|
||||
}
|
||||
|
||||
private List<UUID> getAgentRunIdShouldBeSubstitute() {
|
||||
return agentRunIdShouldBeSubstitute;
|
||||
}
|
||||
|
||||
public Map<UUID, RunningAgentInterface> getAgentRunBlotters() {
|
||||
return agentRunBlotters;
|
||||
}
|
||||
|
||||
private void setAgentRunBlotters(
|
||||
Map<UUID, RunningAgentInterface> agentRunBlotters) {
|
||||
this.agentRunBlotters = agentRunBlotters;
|
||||
}
|
||||
|
||||
private void setAgentRunIdShouldBeSubstitute(
|
||||
List<UUID> agentRunIdShouldBeSubstitute) {
|
||||
this.agentRunIdShouldBeSubstitute = agentRunIdShouldBeSubstitute;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public HighAvailablePoolImpl(AgentRepository agentRepository) {
|
||||
this.setPool(new HashMap<String, Agent>());
|
||||
this.setAgentRunBlotters(new HashMap<UUID, RunningAgentInterface>());
|
||||
this.setAgentStatusOfPreviousBeatMap(new HashMap<String, ServerStatusModel>());
|
||||
this.setCurrentAvailableLoad(0);
|
||||
this.setAgentRunIdShouldBeSubstitute(new LinkedList<UUID>());
|
||||
this.setAgentRepository(agentRepository);
|
||||
initialPool();
|
||||
}
|
||||
|
||||
private void initialPool() {
|
||||
for (Agent agent : this.getAgentRepository().loadEntities()) {
|
||||
this.add(agent);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0,30 */1 * * * *")
|
||||
public synchronized void checkAllHeartBeat() {
|
||||
synchronized (this.getAgentRepository().getAddDeleteLock()) {
|
||||
heartBeatsAndUpdateHAPool();
|
||||
doSubstituteIfRequired();
|
||||
}
|
||||
}
|
||||
|
||||
private void doSubstituteIfRequired() {
|
||||
for (UUID agentRunId : this.getAgentRunIdShouldBeSubstitute()) {
|
||||
RunningAgentInterface runningAgent = this.getAgentRunBlotters()
|
||||
.get(agentRunId);
|
||||
if (runningAgent == null) {
|
||||
// TODO:Actually this is an error
|
||||
continue;
|
||||
}
|
||||
runningAgent.substituteOnBoard();
|
||||
this.getAgentRunIdShouldBeSubstitute().remove(agentRunId);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void calculateHAPoolLoadStatusInMonopolize() {
|
||||
int maxLoad = 0, availableLoad = 0;
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
if (agent.getCurrentEnumStatus() == AgentStatus.BreakDown) {
|
||||
continue;
|
||||
} else if (agent.getCurrentEnumStatus() == AgentStatus.InIdle) {
|
||||
maxLoad += agent.getMaxLoad();
|
||||
availableLoad += agent.getRemainLoad();
|
||||
} else if (agent.getCurrentEnumStatus() == AgentStatus.InRunning) {
|
||||
maxLoad += agent.getMaxLoad();
|
||||
}
|
||||
}
|
||||
this.setMaxAvailableLoad((long) maxLoad);
|
||||
this.setCurrentAvailableLoad(availableLoad);
|
||||
}
|
||||
|
||||
private void heartBeatsAndUpdateHAPool() {
|
||||
this.setCurrentAvailableLoad(0);
|
||||
this.setMaxAvailableLoad((long) 0);
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
checkHeartBeat(agent);
|
||||
}
|
||||
this.getAgentRepository().Update(this.getPool().values());
|
||||
}
|
||||
|
||||
public void checkHeartBeat(Agent agent) {
|
||||
ServerStatusModel model = queryAgentStatus(agent);
|
||||
if (model == null) {
|
||||
doForBreakDown(agent);
|
||||
} else {
|
||||
doForHealth(model, agent);
|
||||
}
|
||||
}
|
||||
|
||||
public ServerStatusModel queryAgentStatus(Agent agent) {
|
||||
return this.getAgentMessenger().getStatus(agent);
|
||||
}
|
||||
|
||||
public void addABlotter(RunningAgentInterface runningAgent) {
|
||||
this.getAgentRunBlotters().put(runningAgent.getAgentRunId(),
|
||||
runningAgent);
|
||||
}
|
||||
|
||||
public void removeABlotter(UUID agentRunId) {
|
||||
this.getAgentRunBlotters().remove(agentRunId);
|
||||
}
|
||||
|
||||
public int blotterSize() {
|
||||
return this.getAgentRunBlotters().size();
|
||||
}
|
||||
|
||||
private void doForBreakDown(Agent agent) {
|
||||
agent.setCurrentEnumStatus(AgentStatus.BreakDown);
|
||||
List<UUID> runIDs = queryUnfinishedTest(null, agent.getHostName());
|
||||
if (runIDs == null) {
|
||||
return;
|
||||
}
|
||||
this.getAgentRunIdShouldBeSubstitute().addAll(runIDs);
|
||||
}
|
||||
|
||||
private void doForHealth(ServerStatusModel newModel, Agent agent) {
|
||||
List<UUID> agentUnfinishedRunIds = newModel.getRunningTests();
|
||||
if (agentUnfinishedRunIds == null || agentUnfinishedRunIds.size() == 0) {
|
||||
doForInIdle(agent);
|
||||
return;
|
||||
}
|
||||
doForInRunning(agent, newModel);
|
||||
}
|
||||
|
||||
private void doForInRunning(Agent agent, ServerStatusModel newModel) {
|
||||
agent.updateStatus(AgentStatus.InRunning);
|
||||
this.agentStatusOfPreviousBeatMap.put(agent.getHostName(), newModel);
|
||||
}
|
||||
|
||||
private void doForInIdle(Agent agent) {
|
||||
agent.updateStatus(AgentStatus.InIdle);
|
||||
agent.resetLoad();
|
||||
}
|
||||
|
||||
private List<UUID> queryUnfinishedTest(ServerStatusModel newModel,
|
||||
String hostName) {
|
||||
if (newModel == null) {
|
||||
ServerStatusModel model = this.agentStatusOfPreviousBeatMap
|
||||
.get(hostName);
|
||||
return model == null ? null : model.getRunningTests();
|
||||
}
|
||||
return newModel.getRunningTests();
|
||||
}
|
||||
|
||||
public List<RunningAgentInterface> applyFor(int load) {
|
||||
List<RunningAgentInterface> result = new ArrayList<RunningAgentInterface>();
|
||||
int loadForRunCurrent = -1;
|
||||
RunScenarioResultModel runScenarioResultModel = null;
|
||||
if (load >= this.getCurrentAvailableLoad()) {
|
||||
logger.info("currentAvailableLoad not enough for substitute");
|
||||
return null;
|
||||
}
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
if (allocationFinish(load)) {
|
||||
break;
|
||||
}
|
||||
if (this.queryAgentStatus(agent) == null || agent.isInUse()) {
|
||||
continue;
|
||||
}
|
||||
loadForRunCurrent = getMin(load, agent.getRemainLoad());
|
||||
agent.bookTest(loadForRunCurrent);
|
||||
this.getAgentRepository().update(agent);
|
||||
runScenarioResultModel = this.getAgentMessenger().bookTest(agent,
|
||||
loadForRunCurrent);
|
||||
if (runScenarioResultModel == null
|
||||
|| runScenarioResultModel.getRunId() == null) {
|
||||
logger.error(runScenarioResultModel == null ? "runScenarioResultModel is null"
|
||||
: "runScenarioResultModel.getRunId()"
|
||||
+ runScenarioResultModel.getRunId());
|
||||
continue;
|
||||
}
|
||||
load -= loadForRunCurrent;
|
||||
RunningAgentInterface runningAgentWithouRunningScript = this
|
||||
.getRunningAgentFactory()
|
||||
.buildRunningAgentWithOutIdAndRunningScript(agent,
|
||||
loadForRunCurrent,
|
||||
runScenarioResultModel.getRunId());
|
||||
result.add(runningAgentWithouRunningScript);
|
||||
this.addABlotter(runningAgentWithouRunningScript);
|
||||
}
|
||||
if (!allocationFinish(load)) {
|
||||
logger.error("allocationUnfinished, remain " + load
|
||||
+ " unallocated");
|
||||
rollbackForApply(result);
|
||||
return null;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void rollbackForApply(List<RunningAgentInterface> runningAgents) {
|
||||
for (RunningAgentInterface runningAgent : runningAgents) {
|
||||
runningAgent.stop();
|
||||
this.removeABlotter(runningAgent.getAgentRunId());
|
||||
}
|
||||
}
|
||||
|
||||
private int getMin(int load, int remainLoad) {
|
||||
return load >= remainLoad ? remainLoad : load;
|
||||
}
|
||||
|
||||
private boolean allocationFinish(int load) {
|
||||
return load == 0;
|
||||
}
|
||||
|
||||
public void cleanUpAboutTestPlan(
|
||||
final Collection<? extends RunningScriptInterface> runningScripts) {
|
||||
|
||||
for (RunningScriptInterface runningScript : runningScripts) {
|
||||
for (RunningAgentInterface runningAgent : runningScript
|
||||
.getRunningAgents()) {
|
||||
this.getAgentRunBlotters().remove(runningAgent.getAgentRunId());
|
||||
logger.info("see the agentBlotter after run : "
|
||||
+ this.getAgentRunBlotters().get(
|
||||
runningAgent.getAgentRunId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
package org.bench4q.master.infrastructure.highavailable.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.bench4q.master.domain.RunningAgentInterface;
|
||||
import org.bench4q.master.domain.RunningScriptInterface;
|
||||
import org.bench4q.master.domain.entity.Agent;
|
||||
import org.bench4q.master.domain.factory.RunningAgentFactory;
|
||||
import org.bench4q.master.domain.repository.AgentRepository;
|
||||
import org.bench4q.master.infrastructure.communication.AgentMessenger;
|
||||
import org.bench4q.master.infrastructure.highavailable.CurrentLoadSubject;
|
||||
import org.bench4q.master.infrastructure.highavailable.HighAvailablePool;
|
||||
import org.bench4q.share.enums.master.AgentStatus;
|
||||
import org.bench4q.share.models.agent.RunScenarioResultModel;
|
||||
import org.bench4q.share.models.agent.ServerStatusModel;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class HighAvailablePoolImpl extends CurrentLoadSubject implements
|
||||
HighAvailablePool {
|
||||
private AgentRepository agentRepository;
|
||||
private AgentMessenger agentMessenger;
|
||||
private RunningAgentFactory runningAgentFactory;
|
||||
private Map<String, Agent> pool;
|
||||
private Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap;
|
||||
private Map<UUID, RunningAgentInterface> agentRunBlotters;
|
||||
private List<UUID> agentRunIdShouldBeSubstitute;
|
||||
private ExecutorService excuExecutorService;
|
||||
private long maxAvailableLoad;
|
||||
private int currentAvailableLoad;
|
||||
static Logger logger = Logger.getLogger(HighAvailablePoolImpl.class);
|
||||
|
||||
public Map<String, Agent> getPool() {
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
private void setPool(Map<String, Agent> pool) {
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
private AgentRepository getAgentRepository() {
|
||||
return agentRepository;
|
||||
}
|
||||
|
||||
private void setAgentRepository(AgentRepository agentRepository) {
|
||||
this.agentRepository = agentRepository;
|
||||
}
|
||||
|
||||
private AgentMessenger getAgentMessenger() {
|
||||
return agentMessenger;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private void setAgentMessenger(AgentMessenger agentMessenger) {
|
||||
this.agentMessenger = agentMessenger;
|
||||
}
|
||||
|
||||
private RunningAgentFactory getRunningAgentFactory() {
|
||||
return runningAgentFactory;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private void setRunningAgentFactory(RunningAgentFactory runningAgentFactory) {
|
||||
this.runningAgentFactory = runningAgentFactory;
|
||||
}
|
||||
|
||||
public Long getMaxAvailableLoad() {
|
||||
this.calculateHAPoolLoadStatusInMonopolize();
|
||||
return maxAvailableLoad;
|
||||
}
|
||||
|
||||
private void setMaxAvailableLoad(Long maxAvailableLoad) {
|
||||
this.maxAvailableLoad = maxAvailableLoad;
|
||||
}
|
||||
|
||||
public synchronized void add(Agent agent) {
|
||||
this.getPool().put(agent.getHostName(), agent);
|
||||
}
|
||||
|
||||
public synchronized void remove(Agent agent) {
|
||||
this.getPool().remove(agent.getHostName());
|
||||
}
|
||||
|
||||
public int getCurrentAvailableLoad() {
|
||||
this.calculateHAPoolLoadStatusInMonopolize();
|
||||
return currentAvailableLoad;
|
||||
}
|
||||
|
||||
private void setCurrentAvailableLoad(int currentAvailableLoad) {
|
||||
this.currentAvailableLoad = currentAvailableLoad;
|
||||
}
|
||||
|
||||
private void setAgentStatusOfPreviousBeatMap(
|
||||
Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap) {
|
||||
this.agentStatusOfPreviousBeatMap = agentStatusOfPreviousBeatMap;
|
||||
}
|
||||
|
||||
private List<UUID> getAgentRunIdShouldBeSubstitute() {
|
||||
return agentRunIdShouldBeSubstitute;
|
||||
}
|
||||
|
||||
public Map<UUID, RunningAgentInterface> getAgentRunBlotters() {
|
||||
return agentRunBlotters;
|
||||
}
|
||||
|
||||
private void setAgentRunBlotters(
|
||||
Map<UUID, RunningAgentInterface> agentRunBlotters) {
|
||||
this.agentRunBlotters = agentRunBlotters;
|
||||
}
|
||||
|
||||
private void setAgentRunIdShouldBeSubstitute(
|
||||
List<UUID> agentRunIdShouldBeSubstitute) {
|
||||
this.agentRunIdShouldBeSubstitute = agentRunIdShouldBeSubstitute;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public HighAvailablePoolImpl(AgentRepository agentRepository) {
|
||||
this.setPool(new HashMap<String, Agent>());
|
||||
this.setAgentRunBlotters(new HashMap<UUID, RunningAgentInterface>());
|
||||
this.setAgentStatusOfPreviousBeatMap(new HashMap<String, ServerStatusModel>());
|
||||
this.setCurrentAvailableLoad(0);
|
||||
this.setAgentRunIdShouldBeSubstitute(new LinkedList<UUID>());
|
||||
this.setAgentRepository(agentRepository);
|
||||
this.excuExecutorService = Executors.newCachedThreadPool();
|
||||
initialPool();
|
||||
}
|
||||
|
||||
private void initialPool() {
|
||||
for (Agent agent : this.getAgentRepository().loadEntities()) {
|
||||
this.add(agent);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0,30 */2 * * * *")
|
||||
public void checkAllHeartBeat() {
|
||||
long time = System.nanoTime();
|
||||
heartBeatsAndUpdateHAPool();
|
||||
doSubstituteIfRequired();
|
||||
System.out.println("finish heart beat:" + (System.nanoTime() - time));
|
||||
}
|
||||
|
||||
private void heartBeatsAndUpdateHAPool() {
|
||||
Map<Agent, Future<ServerStatusModel>> map = new HashMap<Agent, Future<ServerStatusModel>>();
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
CheckHearBeat checkHearBeat = new CheckHearBeat(agent,
|
||||
agentMessenger);
|
||||
map.put(agent, this.excuExecutorService.submit(checkHearBeat));
|
||||
}
|
||||
this.updateAgentPoolByHeart(map);
|
||||
}
|
||||
|
||||
private synchronized void updateAgentPoolByHeart(
|
||||
Map<Agent, Future<ServerStatusModel>> map) {
|
||||
this.setCurrentAvailableLoad(0);
|
||||
this.setMaxAvailableLoad((long) 0);
|
||||
for (Agent agent : map.keySet()) {
|
||||
ServerStatusModel model;
|
||||
try {
|
||||
model = map.get(agent).get();
|
||||
|
||||
if (model == null) {
|
||||
doForBreakDown(agent);
|
||||
} else {
|
||||
doForHealth(model, agent);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
this.getAgentRepository().Update(this.getPool().values());
|
||||
}
|
||||
|
||||
private void doSubstituteIfRequired() {
|
||||
for (UUID agentRunId : this.getAgentRunIdShouldBeSubstitute()) {
|
||||
RunningAgentInterface runningAgent = this.getAgentRunBlotters()
|
||||
.get(agentRunId);
|
||||
if (runningAgent == null) {
|
||||
// TODO:Actually this is an error
|
||||
continue;
|
||||
}
|
||||
runningAgent.substituteOnBoard();
|
||||
this.getAgentRunIdShouldBeSubstitute().remove(agentRunId);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void calculateHAPoolLoadStatusInMonopolize() {
|
||||
int maxLoad = 0, availableLoad = 0;
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
if (agent.getCurrentEnumStatus() == AgentStatus.BreakDown) {
|
||||
continue;
|
||||
} else if (agent.getCurrentEnumStatus() == AgentStatus.InIdle) {
|
||||
maxLoad += agent.getMaxLoad();
|
||||
availableLoad += agent.getRemainLoad();
|
||||
} else if (agent.getCurrentEnumStatus() == AgentStatus.InRunning) {
|
||||
maxLoad += agent.getMaxLoad();
|
||||
}
|
||||
}
|
||||
this.setMaxAvailableLoad((long) maxLoad);
|
||||
this.setCurrentAvailableLoad(availableLoad);
|
||||
}
|
||||
|
||||
public void addABlotter(RunningAgentInterface runningAgent) {
|
||||
this.getAgentRunBlotters().put(runningAgent.getAgentRunId(),
|
||||
runningAgent);
|
||||
}
|
||||
|
||||
public void removeABlotter(UUID agentRunId) {
|
||||
this.getAgentRunBlotters().remove(agentRunId);
|
||||
}
|
||||
|
||||
public int blotterSize() {
|
||||
return this.getAgentRunBlotters().size();
|
||||
}
|
||||
|
||||
private void doForBreakDown(Agent agent) {
|
||||
agent.setCurrentEnumStatus(AgentStatus.BreakDown);
|
||||
List<UUID> runIDs = queryUnfinishedTest(null, agent.getHostName());
|
||||
if (runIDs == null) {
|
||||
return;
|
||||
}
|
||||
this.getAgentRunIdShouldBeSubstitute().addAll(runIDs);
|
||||
}
|
||||
|
||||
private void doForHealth(ServerStatusModel newModel, Agent agent) {
|
||||
List<UUID> agentUnfinishedRunIds = newModel.getRunningTests();
|
||||
if (agentUnfinishedRunIds == null || agentUnfinishedRunIds.size() == 0) {
|
||||
doForInIdle(agent);
|
||||
return;
|
||||
}
|
||||
doForInRunning(agent, newModel);
|
||||
}
|
||||
|
||||
private void doForInRunning(Agent agent, ServerStatusModel newModel) {
|
||||
agent.updateStatus(AgentStatus.InRunning);
|
||||
this.agentStatusOfPreviousBeatMap.put(agent.getHostName(), newModel);
|
||||
}
|
||||
|
||||
private void doForInIdle(Agent agent) {
|
||||
agent.updateStatus(AgentStatus.InIdle);
|
||||
agent.resetLoad();
|
||||
}
|
||||
|
||||
private List<UUID> queryUnfinishedTest(ServerStatusModel newModel,
|
||||
String hostName) {
|
||||
if (newModel == null) {
|
||||
ServerStatusModel model = this.agentStatusOfPreviousBeatMap
|
||||
.get(hostName);
|
||||
return model == null ? null : model.getRunningTests();
|
||||
}
|
||||
return newModel.getRunningTests();
|
||||
}
|
||||
|
||||
public List<RunningAgentInterface> applyFor(int load) {
|
||||
List<RunningAgentInterface> result = new ArrayList<RunningAgentInterface>();
|
||||
int loadForRunCurrent = -1;
|
||||
RunScenarioResultModel runScenarioResultModel = null;
|
||||
if (load >= this.getCurrentAvailableLoad()) {
|
||||
logger.info("currentAvailableLoad not enough for substitute");
|
||||
return null;
|
||||
}
|
||||
for (Agent agent : this.getPool().values()) {
|
||||
if (allocationFinish(load)) {
|
||||
break;
|
||||
}
|
||||
if (this.queryAgentStatus(agent) == null || agent.isInUse()) {
|
||||
continue;
|
||||
}
|
||||
loadForRunCurrent = getMin(load, agent.getRemainLoad());
|
||||
agent.bookTest(loadForRunCurrent);
|
||||
this.getAgentRepository().update(agent);
|
||||
runScenarioResultModel = this.getAgentMessenger().bookTest(agent,
|
||||
loadForRunCurrent);
|
||||
if (runScenarioResultModel == null
|
||||
|| runScenarioResultModel.getRunId() == null) {
|
||||
logger.error(runScenarioResultModel == null ? "runScenarioResultModel is null"
|
||||
: "runScenarioResultModel.getRunId()"
|
||||
+ runScenarioResultModel.getRunId());
|
||||
continue;
|
||||
}
|
||||
load -= loadForRunCurrent;
|
||||
RunningAgentInterface runningAgentWithouRunningScript = this
|
||||
.getRunningAgentFactory()
|
||||
.buildRunningAgentWithOutIdAndRunningScript(agent,
|
||||
loadForRunCurrent,
|
||||
runScenarioResultModel.getRunId());
|
||||
result.add(runningAgentWithouRunningScript);
|
||||
this.addABlotter(runningAgentWithouRunningScript);
|
||||
}
|
||||
if (!allocationFinish(load)) {
|
||||
logger.error("allocationUnfinished, remain " + load
|
||||
+ " unallocated");
|
||||
rollbackForApply(result);
|
||||
return null;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void rollbackForApply(List<RunningAgentInterface> runningAgents) {
|
||||
for (RunningAgentInterface runningAgent : runningAgents) {
|
||||
runningAgent.stop();
|
||||
this.removeABlotter(runningAgent.getAgentRunId());
|
||||
}
|
||||
}
|
||||
|
||||
private int getMin(int load, int remainLoad) {
|
||||
return load >= remainLoad ? remainLoad : load;
|
||||
}
|
||||
|
||||
private boolean allocationFinish(int load) {
|
||||
return load == 0;
|
||||
}
|
||||
|
||||
public void cleanUpAboutTestPlan(
|
||||
final Collection<? extends RunningScriptInterface> runningScripts) {
|
||||
|
||||
for (RunningScriptInterface runningScript : runningScripts) {
|
||||
for (RunningAgentInterface runningAgent : runningScript
|
||||
.getRunningAgents()) {
|
||||
this.getAgentRunBlotters().remove(runningAgent.getAgentRunId());
|
||||
logger.info("see the agentBlotter after run : "
|
||||
+ this.getAgentRunBlotters().get(
|
||||
runningAgent.getAgentRunId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void checkHeartBeat(Agent agent) {
|
||||
ServerStatusModel model = queryAgentStatus(agent);
|
||||
if (model == null) {
|
||||
doForBreakDown(agent);
|
||||
} else {
|
||||
doForHealth(model, agent);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerStatusModel queryAgentStatus(Agent agent) {
|
||||
return this.getAgentMessenger().getStatus(agent);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class CheckHearBeat implements Callable<ServerStatusModel> {
|
||||
private Agent agent;
|
||||
private AgentMessenger agentMessenger;
|
||||
|
||||
private AgentMessenger getAgentMessenger() {
|
||||
return agentMessenger;
|
||||
}
|
||||
|
||||
public CheckHearBeat(Agent agent, AgentMessenger agentMessenger) {
|
||||
this.agent = agent;
|
||||
this.agentMessenger = agentMessenger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerStatusModel call() throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
return this.getAgentMessenger().getStatus(agent);
|
||||
}
|
||||
}
|
|
@ -3,7 +3,7 @@
|
|||
<hibernate-configuration>
|
||||
<session-factory>
|
||||
<property name="hibernate.connection.driver_class">com.mysql.jdbc.Driver </property>
|
||||
<property name="hibernate.connection.url"> jdbc:mysql://localhost:3306/bench4q_master
|
||||
<property name="hibernate.connection.url"> jdbc:mysql://localhost:3306/bench4q
|
||||
</property>
|
||||
<property name="hibernate.connection.username">root</property>
|
||||
<property name="hibernate.connection.password">123456 </property>
|
||||
|
|
Loading…
Reference in New Issue