Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,48 @@ public void run() {
logger.info("Starting up...");
HttpClient client = HttpClient.newHttpClient();
try {
boolean controllerInitiatedWsEnabled = Boolean.parseBoolean(
AmazonUtil.getUserDataAsMap().getOrDefault(TankConstants.KEY_CONTROLLER_INITIATED_WS_ENABLED, "false"));
logger.info("Starting up: ControllerBaseUrl={}", controllerBaseUrl);
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(
controllerBaseUrl + SERVICE_RELATIVE_PATH + METHOD_SETTINGS))
.header("Authorization", "bearer "+token).build();
logger.info("Starting up: making call to tank service url to get settings.xml {} {} {}",
controllerBaseUrl, SERVICE_RELATIVE_PATH, METHOD_SUPPORT);
client.send(request, BodyHandlers.ofFile(Paths.get(TANK_AGENT_DIR, "settings.xml")));
logger.info("got settings file...");
// Download Support Files
request = HttpRequest.newBuilder().uri(URI.create(
controllerBaseUrl + SERVICE_RELATIVE_PATH + METHOD_SUPPORT))
.header("Authorization", "bearer "+token).build();
logger.info("Making call to tank service url to get support files {} {} {}",
controllerBaseUrl, SERVICE_RELATIVE_PATH, METHOD_SUPPORT);
int retryCount = 0;
while (true) {
try (ZipInputStream zip = new ZipInputStream(
client.send(request, BodyHandlers.ofInputStream()).body())) {
ZipEntry entry = zip.getNextEntry();
Path agentDirPath = Paths.get(TANK_AGENT_DIR).toAbsolutePath().normalize();
while (entry != null) {
String filename = entry.getName();
logger.info("Got file from controller: {}", filename);
Path targetPath = agentDirPath.resolve(filename).normalize();
if (!targetPath.startsWith(agentDirPath)) // Protect "Zip Slip"
throw new ZipException("Bad zip entry");
Files.write(targetPath, zip.readAllBytes());
entry = zip.getNextEntry();
if (!controllerInitiatedWsEnabled) {
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(
controllerBaseUrl + SERVICE_RELATIVE_PATH + METHOD_SETTINGS))
.header("Authorization", "bearer "+token).build();
logger.info("Starting up: making call to tank service url to get settings.xml {} {} {}",
controllerBaseUrl, SERVICE_RELATIVE_PATH, METHOD_SUPPORT);
client.send(request, BodyHandlers.ofFile(Paths.get(TANK_AGENT_DIR, "settings.xml")));
logger.info("got settings file...");
// Download Support Files
request = HttpRequest.newBuilder().uri(URI.create(
controllerBaseUrl + SERVICE_RELATIVE_PATH + METHOD_SUPPORT))
.header("Authorization", "bearer "+token).build();
logger.info("Making call to tank service url to get support files {} {} {}",
controllerBaseUrl, SERVICE_RELATIVE_PATH, METHOD_SUPPORT);
int retryCount = 0;
while (true) {
try (ZipInputStream zip = new ZipInputStream(
client.send(request, BodyHandlers.ofInputStream()).body())) {
ZipEntry entry = zip.getNextEntry();
Path agentDirPath = Paths.get(TANK_AGENT_DIR).toAbsolutePath().normalize();
while (entry != null) {
String filename = entry.getName();
logger.info("Got file from controller: {}", filename);
Path targetPath = agentDirPath.resolve(filename).normalize();
if (!targetPath.startsWith(agentDirPath)) // Protect "Zip Slip"
throw new ZipException("Bad zip entry");
Files.write(targetPath, zip.readAllBytes());
entry = zip.getNextEntry();
}
break;
} catch (EOFException | ZipException e) {
logger.error("Error unzipping support files : retryCount={} : {}", retryCount, e.getMessage());
if (retryCount < FIBONACCI.length) {
Thread.sleep( FIBONACCI[retryCount++] * 1000 );
} else throw e;
}
break;
} catch (EOFException | ZipException e) {
logger.error("Error unzipping support files : retryCount={} : {}", retryCount, e.getMessage());
if (retryCount < FIBONACCI.length) {
Thread.sleep( FIBONACCI[retryCount++] * 1000 );
} else throw e;
}
} else {
logger.info("Controller-initiated WS mode enabled - skipping settings/support download from controller.");
}
// now start the harness
String controllerArg = " -http=" + controllerBaseUrl;
Expand Down
6 changes: 6 additions & 0 deletions agent/apiharness/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@
<groupId>org.openjdk.nashorn</groupId>
<artifactId>nashorn-core</artifactId>
</dependency>

<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void updateInstanceStatus() {
sendTps(tpsInfo);
}

if (!isLocal) {
if (!isLocal && !APITestHarness.getInstance().isControllerInitiatedWsModeEnabled()) {
setInstanceStatus(newStatus.getInstanceId(), newStatus);
}
APITestHarness.getInstance().checkAgentThreads();
Expand Down Expand Up @@ -162,7 +162,9 @@ public synchronized static void setJobStatus(JobStatus jobStatus) {
stats.getMaxVirtualUsers(),
stats.getCurrentNumberUsers(), status.getStartTime(), endTime);
status.setUserDetails(APITestHarness.getInstance().getUserTracker().getSnapshot());
setInstanceStatus(status.getInstanceId(), status);
if (!APITestHarness.getInstance().isControllerInitiatedWsModeEnabled()) {
setInstanceStatus(status.getInstanceId(), status);
}
} catch (Exception e) {
LOG.error("Error sending status to controller: {}", e.toString(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class APITestHarness {
private TPSMonitor tpsMonitor;
private ResultsReporter resultsReporter;
private String tankHttpClientClass;
private AgentCommandWebSocketServer controllerInitiatedWsServer;

private Date send = new Date();
private static final int interval = 15; // SECONDS
Expand Down Expand Up @@ -226,6 +227,39 @@ private void initializeFromArgs(String[] args) {
}
}

public boolean isControllerInitiatedWsModeEnabled() {
try {
String value = AmazonUtil.getUserDataAsMap().get(TankConstants.KEY_CONTROLLER_INITIATED_WS_ENABLED);
if (StringUtils.isNotBlank(value)) {
return Boolean.parseBoolean(value);
}
} catch (Exception ignored) {
}
return tankConfig.getAgentConfig().isControllerInitiatedWsEnabled();
}

private boolean isControllerInitiatedWsDisableAgentHttpEnabled() {
try {
String value = AmazonUtil.getUserDataAsMap().get(TankConstants.KEY_CONTROLLER_INITIATED_WS_DISABLE_AGENT_HTTP);
if (StringUtils.isNotBlank(value)) {
return Boolean.parseBoolean(value);
}
} catch (Exception ignored) {
}
return tankConfig.getAgentConfig().isControllerInitiatedWsDisableAgentHttp();
}

private String getControllerInitiatedWsScriptPath() {
try {
String value = AmazonUtil.getUserDataAsMap().get(TankConstants.KEY_CONTROLLER_INITIATED_WS_SCRIPT_PATH);
if (StringUtils.isNotBlank(value)) {
return value;
}
} catch (Exception ignored) {
}
return tankConfig.getAgentConfig().getControllerInitiatedWsScriptPath();
}

private String getLocalInstanceId() {
isLocal = true;
try {
Expand Down Expand Up @@ -255,7 +289,12 @@ private static void usage() {
private void startHttp(String baseUrl, String token) {
isLocal = false;
HostInfo hostInfo = new HostInfo();
CommandListener.startHttpServer(tankConfig.getAgentConfig().getAgentPort());
boolean controllerInitiatedWsMode = isControllerInitiatedWsModeEnabled();
boolean controllerInitiatedWsDisableAgentHttp = isControllerInitiatedWsDisableAgentHttpEnabled();

if (!controllerInitiatedWsMode || !controllerInitiatedWsDisableAgentHttp) {
CommandListener.startHttpServer(tankConfig.getAgentConfig().getAgentPort());
}
baseUrl = (baseUrl == null) ? AmazonUtil.getControllerBaseUrl() : baseUrl;
token = (token == null) ? AmazonUtil.getAgentToken() : token;
String instanceUrl = null;
Expand Down Expand Up @@ -291,6 +330,42 @@ private void startHttp(String baseUrl, String token) {
agentRunData.setStopBehavior(AmazonUtil.getStopBehavior());
LogUtil.getLogEvent().setJobId(agentRunData.getJobId());

if (controllerInitiatedWsMode && controllerInitiatedWsDisableAgentHttp) {
try {
if (controllerInitiatedWsServer == null) {
controllerInitiatedWsServer = new AgentCommandWebSocketServer(
tankConfig.getAgentConfig().getAgentPort(),
instanceId,
agentRunData.getJobId(),
capacity);
controllerInitiatedWsServer.start();
}

String scriptPath = getControllerInitiatedWsScriptPath();
if (StringUtils.isNotBlank(scriptPath) && new File(scriptPath).exists()) {
LOG.info(new ObjectMessage(Map.of("Message", "Controller-initiated WS mode loading script from " + scriptPath)));
TestPlanSingleton.getInstance().setTestPlans(scriptPath);
} else if (StringUtils.isNotBlank(testPlans) && AgentUtil.validateTestPlans(testPlans)) {
LOG.info(new ObjectMessage(Map.of("Message", "Controller-initiated WS mode loading script from args " + testPlans)));
TestPlanSingleton.getInstance().setTestPlans(testPlans);
} else {
LOG.warn(new ObjectMessage(Map.of("Message", "Controller-initiated WS mode has no valid local script path configured. Awaiting controller command anyway.")));
}

Thread thread = new Thread(new StartedChecker());
thread.setName("StartedChecker");
thread.setDaemon(false);
thread.start();
return;
} catch (Exception e) {
LOG.error("Error starting controller-initiated WS mode: " + e, e);
System.exit(0);
}
} else if (controllerInitiatedWsMode) {
LOG.warn(new ObjectMessage(Map.of("Message",
"Controller-initiated WS is enabled but HTTP disable flag is false; running legacy HTTP lifecycle path")));
}

AgentData data = new AgentData(agentRunData.getJobId(), instanceId, instanceUrl, capacity,
AmazonUtil.getVMRegion(), AmazonUtil.getZone());
try {
Expand Down Expand Up @@ -340,6 +415,15 @@ private void startHttp(String baseUrl, String token) {
saveDataFile(dfRequest, token);
}
}
// Start WS control channel if enabled
if (tankConfig.getAgentConfig().isCommandWsEnabled()) {
String wsPath = tankConfig.getAgentConfig().getCommandWsPath();
LOG.info(new ObjectMessage(Map.of("Message", "Starting WS control channel to " + baseUrl + wsPath)));
AgentCommandWebSocketClient wsClient = new AgentCommandWebSocketClient(
baseUrl, wsPath, token, instanceId, agentRunData.getJobId());
wsClient.connect();
}

Thread thread = new Thread(new StartedChecker());
thread.setName("StartedChecker");
thread.setDaemon(false);
Expand Down
Loading
Loading