This commit is contained in:
hmm 2014-09-01 08:53:51 +08:00
commit 2b77bb473b
4 changed files with 82 additions and 28 deletions

View File

@ -3,6 +3,7 @@ package org.bench4q.master.infrastructure.communication;
import java.io.File;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
import org.bench4q.master.domain.entity.Agent;
import org.bench4q.share.models.agent.RunScenarioModel;
@ -25,4 +26,6 @@ public interface AgentMessenger {
public StopTestModel stop(Agent agent, UUID runId);
public ServerStatusModel getStatus(Agent agent);
public Future<ServerStatusModel> getStatusAsync(Agent agent);
}

View File

@ -4,6 +4,10 @@ import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import org.bench4q.master.domain.entity.Agent;
@ -23,6 +27,7 @@ import org.springframework.stereotype.Component;
@Component
public class AgentMessengerImpl implements AgentMessenger {
private HttpRequester httpRequester;
private ExecutorService executorService = Executors.newCachedThreadPool();
private Logger logger = Logger.getLogger(AgentMessengerImpl.class);
private HttpRequester getHttpRequester() {
@ -55,6 +60,17 @@ public class AgentMessengerImpl implements AgentMessenger {
}
}
public Future<RunScenarioResultModel> bookTestAsync(final Agent agent,
final int requiredLoad) {
return this.executorService
.submit(new Callable<RunScenarioResultModel>() {
@Override
public RunScenarioResultModel call() throws Exception {
return bookTest(agent, requiredLoad);
}
});
}
public RunScenarioResultModel submitScenrioWithParams(Agent agent,
UUID agentRunId, List<File> paramFiles,
final RunScenarioModel runScenarioModel) {
@ -79,6 +95,19 @@ public class AgentMessengerImpl implements AgentMessenger {
}
}
public Future<RunScenarioResultModel> submitScenarioWithParamsAsync(
final Agent agent, final UUID agentRunId,
final List<File> paramFiles, final RunScenarioModel runScenarioModel) {
return this.executorService
.submit(new Callable<RunScenarioResultModel>() {
@Override
public RunScenarioResultModel call() throws Exception {
return submitScenrioWithParams(agent, agentRunId,
paramFiles, runScenarioModel);
}
});
}
public RunScenarioResultModel runWithParams(Agent agent, UUID agentRunId) {
HttpResponse httpResponse = null;
try {
@ -100,6 +129,17 @@ public class AgentMessengerImpl implements AgentMessenger {
}
}
public Future<RunScenarioResultModel> runAsync(final Agent agent,
final UUID agentRunId) {
return this.executorService
.submit(new Callable<RunScenarioResultModel>() {
@Override
public RunScenarioResultModel call() throws Exception {
return runWithParams(agent, agentRunId);
}
});
}
// there is bug in here
public TestBriefStatusModel scriptBriefAll(Agent agent, UUID runId) {
HttpResponse httpResponse = null;
@ -125,6 +165,17 @@ public class AgentMessengerImpl implements AgentMessenger {
}
}
public Future<TestBriefStatusModel> scriptBriefAsync(final Agent agent,
final UUID runId) {
return this.executorService
.submit(new Callable<TestBriefStatusModel>() {
@Override
public TestBriefStatusModel call() throws Exception {
return scriptBriefAll(agent, runId);
}
});
}
public StopTestModel stop(Agent agent, UUID runId) {
HttpResponse httpResponse;
try {
@ -146,6 +197,16 @@ public class AgentMessengerImpl implements AgentMessenger {
}
}
public Future<StopTestModel> stopAsync(final Agent agent, final UUID runId) {
return this.executorService.submit(new Callable<StopTestModel>() {
@Override
public StopTestModel call() throws Exception {
return stop(agent, runId);
}
});
}
private String buildBaseUrl(Agent agent) {
return agent.getHostName() + ":" + agent.getPort();
}
@ -165,4 +226,14 @@ public class AgentMessengerImpl implements AgentMessenger {
return null;
}
}
public Future<ServerStatusModel> getStatusAsync(final Agent agent) {
return this.executorService.submit(new Callable<ServerStatusModel>() {
@Override
public ServerStatusModel call() throws Exception {
return getStatus(agent);
}
});
}
}

View File

@ -7,9 +7,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import org.bench4q.master.domain.RunningAgentInterface;
@ -37,7 +34,6 @@ public class HighAvailablePoolImpl extends CurrentLoadSubject implements
private Map<String, ServerStatusModel> agentStatusOfPreviousBeatMap;
private Map<UUID, RunningAgentInterface> agentRunBlotters;
private List<UUID> agentRunIdShouldBeSubstitute;
private ExecutorService excuExecutorService;
private long maxAvailableLoad;
private int currentAvailableLoad;
static Logger logger = Logger.getLogger(HighAvailablePoolImpl.class);
@ -133,7 +129,6 @@ public class HighAvailablePoolImpl extends CurrentLoadSubject implements
this.setCurrentAvailableLoad(0);
this.setAgentRunIdShouldBeSubstitute(new LinkedList<UUID>());
this.setAgentRepository(agentRepository);
this.excuExecutorService = Executors.newCachedThreadPool();
initialPool();
}
@ -152,8 +147,7 @@ public class HighAvailablePoolImpl extends CurrentLoadSubject implements
private void heartBeatsAndUpdateHAPool() {
Map<Agent, Future<ServerStatusModel>> map = new HashMap<Agent, Future<ServerStatusModel>>();
for (Agent agent : this.getPool().values()) {
HearBeat checkHearBeat = new HearBeat(agent, agentMessenger);
map.put(agent, this.excuExecutorService.submit(checkHearBeat));
map.put(agent, this.getAgentMessenger().getStatusAsync(agent));
}
this.updateAgentPoolByHeart(map);
}
@ -166,7 +160,6 @@ public class HighAvailablePoolImpl extends CurrentLoadSubject implements
ServerStatusModel model;
try {
model = map.get(agent).get();
if (model == null) {
doForBreakDown(agent);
} else {
@ -350,23 +343,3 @@ public class HighAvailablePoolImpl extends CurrentLoadSubject implements
}
}
class HearBeat implements Callable<ServerStatusModel> {
private Agent agent;
private AgentMessenger agentMessenger;
private AgentMessenger getAgentMessenger() {
return agentMessenger;
}
public HearBeat(Agent agent, AgentMessenger agentMessenger) {
this.agent = agent;
this.agentMessenger = agentMessenger;
}
@Override
public ServerStatusModel call() throws Exception {
// TODO Auto-generated method stub
return this.getAgentMessenger().getStatus(agent);
}
}

View File

@ -4,6 +4,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
import org.bench4q.master.domain.entity.Agent;
import org.bench4q.master.infrastructure.communication.AgentMessenger;
@ -17,6 +18,7 @@ import org.bench4q.share.models.agent.TestBriefStatusModel;
import org.bench4q.share.models.agent.statistics.AgentBehaviorsBriefModel;
import org.bench4q.share.models.agent.statistics.AgentBriefStatusModel;
import org.bench4q.share.models.agent.statistics.AgentPagesBriefModel;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
@Component
@ -83,4 +85,9 @@ public class Mock_AgentMessenger implements AgentMessenger {
}
return null;
}
@Override
public Future<ServerStatusModel> getStatusAsync(Agent agent) {
return new AsyncResult<ServerStatusModel>(getStatus(agent));
}
}