diff --git a/agent/apiharness/src/main/java/com/intuit/tank/harness/APIMonitor.java b/agent/apiharness/src/main/java/com/intuit/tank/harness/APIMonitor.java index 7211a8467..8e413128f 100644 --- a/agent/apiharness/src/main/java/com/intuit/tank/harness/APIMonitor.java +++ b/agent/apiharness/src/main/java/com/intuit/tank/harness/APIMonitor.java @@ -18,6 +18,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.time.Duration; import java.util.Date; import com.fasterxml.jackson.core.JsonProcessingException; @@ -144,43 +145,98 @@ public static void setDoMonitor(boolean monitor) { doMonitor = monitor; } - public synchronized static void setJobStatus(JobStatus jobStatus) { + public static void setJobStatus(JobStatus jobStatus) { LOG.debug(LogUtil.getLogMessage("Setting job status to: " + jobStatus)); - if (status != null && status.getJobStatus() != JobStatus.Completed) { + // Capture immutable status snapshot inside lock, send outside to avoid + // blocking the synchronized section on network I/O (terminal retry can take ~6s) + String instanceId; + CloudVmStatus statusToSend; + synchronized (APIMonitor.class) { + if (status == null || status.getJobStatus() == JobStatus.Completed) { + return; + } try { - VMStatus vmStatus = jobStatus.equals(JobStatus.Stopped) ? VMStatus.stopping + VMStatus vmStatus = jobStatus.equals(JobStatus.Stopped) ? VMStatus.stopping : jobStatus.equals(JobStatus.RampPaused) ? VMStatus.rampPaused : jobStatus.equals(JobStatus.Running) ? VMStatus.running : jobStatus.equals(JobStatus.Completed) ? VMStatus.terminated : status.getVmStatus(); WatsAgentStatusResponse stats = APITestHarness.getInstance().getStatus(); - Date endTime = (jobStatus == JobStatus.Completed) ? new Date() : status - .getEndTime(); - status = new CloudVmStatus(status.getInstanceId(), status.getJobId(), status.getSecurityGroup(), + Date endTime = (jobStatus == JobStatus.Completed) ? new Date() : status.getEndTime(); + // Build into local — only assign to static 'status' after full success + CloudVmStatus newStatus = new CloudVmStatus(status.getInstanceId(), status.getJobId(), status.getSecurityGroup(), jobStatus, status.getRole(), status.getVmRegion(), vmStatus, new ValidationStatus(stats.getKills(), stats.getAborts(), stats.getGotos(), stats.getSkips(), stats.getSkipGroups(), stats.getRestarts()), stats.getMaxVirtualUsers(), stats.getCurrentNumberUsers(), status.getStartTime(), endTime); - status.setUserDetails(APITestHarness.getInstance().getUserTracker().getSnapshot()); - setInstanceStatus(status.getInstanceId(), status); + newStatus.setUserDetails(APITestHarness.getInstance().getUserTracker().getSnapshot()); + status = newStatus; + instanceId = status.getInstanceId(); + statusToSend = status; } catch (Exception e) { - LOG.error("Error sending status to controller: {}", e.toString(), e); + LOG.error("Error building status snapshot in setJobStatus: {}", e.toString(), e); + return; } } + // Network send is outside the lock — terminal retry won't block other threads + try { + setInstanceStatus(instanceId, statusToSend); + } catch (Exception e) { + LOG.error("Error sending status to controller: {}", e.toString(), e); + } } - protected static void setInstanceStatus(String instanceId, CloudVmStatus VmStatus) throws URISyntaxException, JsonProcessingException { - String json = objectWriter.writeValueAsString(VmStatus); + protected static void setInstanceStatus(String instanceId, CloudVmStatus vmStatus) throws URISyntaxException, JsonProcessingException { + // Force currentUsers=0 on terminal statuses — agent threads may not have fully exited + boolean isTerminal = vmStatus.getJobStatus() == JobStatus.Completed + || vmStatus.getJobStatus() == JobStatus.Stopped + || vmStatus.getVmStatus() == VMStatus.terminated; + if (isTerminal) { + vmStatus.setCurrentUsers(0); + } + + String json = objectWriter.writeValueAsString(vmStatus); String token = APITestHarness.getInstance().getTankConfig().getAgentConfig().getAgentToken(); HttpRequest request = HttpRequest.newBuilder() .uri(new URI(APITestHarness.getInstance().getTankConfig().getControllerBase() + "/v2/agent/instance/status/" + instanceId)) .header(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType()) .header(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()) .header(HttpHeaders.AUTHORIZATION, "bearer " + token) + .timeout(Duration.ofSeconds(10)) .PUT(HttpRequest.BodyPublishers.ofString(json)) .build(); LOG.debug(LogUtil.getLogMessage("Sending instance status update for instance: " + instanceId + ", Status: " + json)); - client.sendAsync(request, HttpResponse.BodyHandlers.discarding()); + + if (isTerminal) { + // Terminal statuses use synchronous send with retry — losing the final Completed + // report causes the controller to show stale "Running" status indefinitely + for (int attempt = 1; attempt <= 3; attempt++) { + try { + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.discarding()); + if (response.statusCode() == 200 || response.statusCode() == 202 || response.statusCode() == 204) { + LOG.info(LogUtil.getLogMessage("Terminal status delivered on attempt " + attempt)); + return; + } + LOG.warn(LogUtil.getLogMessage("Terminal status delivery got " + response.statusCode() + " on attempt " + attempt)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn(LogUtil.getLogMessage("Terminal status delivery interrupted on attempt " + attempt)); + return; + } catch (Exception e) { + LOG.warn(LogUtil.getLogMessage("Terminal status delivery failed on attempt " + attempt + ": " + e.getMessage())); + } + if (attempt < 3) { + try { Thread.sleep(2000); } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + return; + } + } + } + LOG.error(LogUtil.getLogMessage("Failed to deliver terminal status after 3 attempts for instance " + instanceId)); + } else { + // Non-terminal statuses remain async (fire-and-forget is fine for periodic updates) + client.sendAsync(request, HttpResponse.BodyHandlers.discarding()); + } } } diff --git a/rest-mvc/impl/src/main/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSender.java b/rest-mvc/impl/src/main/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSender.java index b14625b0f..9c6eed6e5 100644 --- a/rest-mvc/impl/src/main/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSender.java +++ b/rest-mvc/impl/src/main/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSender.java @@ -274,6 +274,10 @@ public CloudVmStatus getVmStatus(String instanceId) { } public void setVmStatus(final String instanceId, final CloudVmStatus status) { + // Normalize: completed/terminated agents always have zero users + if (status.getJobStatus() == JobStatus.Completed || status.getVmStatus() == VMStatus.terminated) { + status.setCurrentUsers(0); + } vmTracker.setStatus(status); if (status.getJobStatus() == JobStatus.Completed || status.getVmStatus() == VMStatus.terminated diff --git a/rest-mvc/impl/src/test/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSenderTest.java b/rest-mvc/impl/src/test/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSenderTest.java index 17562a19f..f2ef64768 100644 --- a/rest-mvc/impl/src/test/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSenderTest.java +++ b/rest-mvc/impl/src/test/java/com/intuit/tank/rest/mvc/rest/cloud/JobEventSenderTest.java @@ -1,5 +1,6 @@ package com.intuit.tank.rest.mvc.rest.cloud; +import com.intuit.tank.vm.vmManager.VMTerminator; import com.intuit.tank.vm.vmManager.VMTracker; import com.intuit.tank.vm.vmManager.models.CloudVmStatus; import com.intuit.tank.vm.vmManager.models.CloudVmStatusContainer; @@ -32,6 +33,9 @@ public class JobEventSenderTest { @Mock private VMTracker vmTracker; + @Mock + private VMTerminator terminator; + @InjectMocks private JobEventSender jobEventSender; @@ -263,6 +267,61 @@ void getInstancesForJob_excludesReplacedAndTerminated() throws Exception { assertFalse(instances.contains("i-replaced")); } + // ============ setVmStatus normalization tests (Fix 2) ============ + + @Test + @DisplayName("setVmStatus normalizes currentUsers to 0 for Completed agent before passing to vmTracker") + void setVmStatus_completedAgent_normalizesCurrentUsersToZero() { + // Given: An agent reports Completed but still has currentUsers=10 + CloudVmStatus status = createStatus("i-agent1", JobStatus.Completed, VMStatus.running); + // Manually set currentUsers to simulate stale count + status.setCurrentUsers(10); + + // When + jobEventSender.setVmStatus("i-agent1", status); + + // Then: The status passed to vmTracker.setStatus should have currentUsers=0 + assertEquals(0, status.getCurrentUsers(), + "Completed agent should have currentUsers normalized to 0 before reaching vmTracker"); + verify(vmTracker).setStatus(status); + verify(terminator).terminate("i-agent1"); + } + + @Test + @DisplayName("setVmStatus normalizes currentUsers to 0 for terminated agent") + void setVmStatus_terminatedAgent_normalizesCurrentUsersToZero() { + // Given: An agent reports terminated but still has currentUsers=5 + CloudVmStatus status = createStatus("i-agent1", JobStatus.Completed, VMStatus.terminated); + status.setCurrentUsers(5); + + // When + jobEventSender.setVmStatus("i-agent1", status); + + // Then + assertEquals(0, status.getCurrentUsers(), + "Terminated agent should have currentUsers normalized to 0"); + verify(vmTracker).setStatus(status); + verify(terminator).terminate("i-agent1"); + } + + @Test + @DisplayName("setVmStatus does NOT normalize currentUsers for Running agent") + void setVmStatus_runningAgent_preservesCurrentUsers() { + // Given: A Running agent with 50 currentUsers + CloudVmStatus status = createStatus("i-agent1", JobStatus.Running, VMStatus.running); + status.setCurrentUsers(50); + + // When + jobEventSender.setVmStatus("i-agent1", status); + + // Then: currentUsers should remain 50 + assertEquals(50, status.getCurrentUsers(), + "Running agent should keep its currentUsers unchanged"); + verify(vmTracker).setStatus(status); + } + + // ============ getVmStatus delegation tests ============ + @Test @DisplayName("getVmStatus delegates to vmTracker") void getVmStatus_delegatesToVmTracker() { diff --git a/tank_vmManager/pom.xml b/tank_vmManager/pom.xml index cb5597c7a..6c4162d9f 100644 --- a/tank_vmManager/pom.xml +++ b/tank_vmManager/pom.xml @@ -75,5 +75,11 @@ serializer + + org.mockito + mockito-junit-jupiter + test + + diff --git a/tank_vmManager/src/main/java/com/intuit/tank/vmManager/VMTrackerImpl.java b/tank_vmManager/src/main/java/com/intuit/tank/vmManager/VMTrackerImpl.java index 7c0083d05..e3f05d9e1 100644 --- a/tank_vmManager/src/main/java/com/intuit/tank/vmManager/VMTrackerImpl.java +++ b/tank_vmManager/src/main/java/com/intuit/tank/vmManager/VMTrackerImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -92,6 +93,9 @@ public class VMTrackerImpl implements VMTracker { private Map statusMap = new ConcurrentHashMap(); private Map jobMap = new ConcurrentHashMap(); private Set stoppedJobs = new HashSet(); + // Tracks when we last received ANY report from an instance (set before executor enqueue). + // Used for staleness detection — immune to DiscardOldestPolicy dropping the task. + private Map lastSeenMap = new ConcurrentHashMap<>(); private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, @@ -103,12 +107,41 @@ public class VMTrackerImpl implements VMTracker { }, new ThreadPoolExecutor.DiscardOldestPolicy()); + // Dedicated executor for terminal status updates — small queue + CallerRunsPolicy + // guarantees delivery without silently dropping, and avoids blocking the REST thread + // for extended periods when killInstances loops over many instances. + private static final ThreadPoolExecutor TERMINAL_EXECUTOR = + new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(20), + threadFactoryRunnable -> { + Thread t = Executors.defaultThreadFactory().newThread(threadFactoryRunnable); + t.setDaemon(true); + t.setName("terminal-status-" + t.getId()); + return t; + }, + (r, executor) -> { + // CallerRunsPolicy with logging — runs on caller thread when queue is full + LOG.warn("Terminal executor queue full (size={}), running on caller thread: {}", + executor.getQueue().size(), Thread.currentThread().getName()); + if (!executor.isShutdown()) { + r.run(); + } + }); + + private ScheduledExecutorService stalenessSweeper; + /** * */ @PostConstruct public void init() { devMode = new TankConfig().getStandalone(); + stalenessSweeper = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "staleness-sweeper"); + t.setDaemon(true); + return t; + }); + stalenessSweeper.scheduleAtFixedRate(this::sweepStaleJobs, 30, 30, TimeUnit.SECONDS); } /** @@ -140,62 +173,79 @@ public void publishEvent(JobEvent event) { */ @Override public void setStatus(@Nonnull final CloudVmStatus status) { - Runnable task = () -> setStatusThread(status); - EXECUTOR.execute(task); + // Record ingest time BEFORE enqueue — immune to DiscardOldestPolicy drops. + // Used by staleness detection instead of reportTime (which is set inside the executor). + lastSeenMap.put(status.getInstanceId(), System.currentTimeMillis()); + + boolean isTerminal = status.getJobStatus() == JobStatus.Completed + || status.getJobStatus() == JobStatus.Stopped + || status.getVmStatus() == VMStatus.terminated; + if (isTerminal) { + // Terminal updates use a dedicated executor with CallerRunsPolicy — + // guarantees delivery (never drops), falls back to caller thread if queue is full + TERMINAL_EXECUTOR.execute(() -> setStatusThread(status)); + } else { + EXECUTOR.execute(() -> setStatusThread(status)); + } } private void setStatusThread(@Nonnull final CloudVmStatus status) { - AWSXRay.getGlobalRecorder().beginNoOpSegment(); //initiation call has already returned 204 - synchronized (getCacheSyncObject(status.getJobId())) { - status.setReportTime(new Date()); - CloudVmStatus currentStatus = getStatus(status.getInstanceId()); - - if (shouldUpdateStatus(currentStatus)) { + AWSXRay.getGlobalRecorder().beginNoOpSegment(); + try { + synchronized (getCacheSyncObject(status.getJobId())) { + status.setReportTime(new Date()); + CloudVmStatus currentStatus = getStatus(status.getInstanceId()); + + if (!shouldUpdateStatus(currentStatus, status)) { + LOG.debug(new ObjectMessage(Map.of("Message", + "Skipping full status update for instance " + status.getInstanceId() + + " - current status is terminal: " + (currentStatus != null ? currentStatus.getVmStatus() : "null")))); + return; + } + statusMap.put(status.getInstanceId(), status); if (status.getVmStatus() == VMStatus.running - && (status.getJobStatus() == JobStatus.Completed) - && !isDevMode()) { - AmazonInstance amzInstance = new AmazonInstance(status.getVmRegion()); - amzInstance.killInstances(Collections.singletonList(status.getInstanceId())); + && (status.getJobStatus() == JobStatus.Completed) + && !isDevMode()) { + AmazonInstance amzInstance = new AmazonInstance(status.getVmRegion()); + amzInstance.killInstances(Collections.singletonList(status.getInstanceId())); } - } else { - LOG.debug(new ObjectMessage(Map.of("Message", - "Skipping status update for instance " + status.getInstanceId() + - " - current status is " + (currentStatus != null ? currentStatus.getVmStatus() : "null")))); - } - String jobId = status.getJobId(); - CloudVmStatusContainer cloudVmStatusContainer = jobMap.get(jobId); - if (cloudVmStatusContainer == null) { - cloudVmStatusContainer = new CloudVmStatusContainer(); - cloudVmStatusContainer.setJobId(jobId); - - jobMap.put(jobId, cloudVmStatusContainer); - JobInstance job = jobInstanceDao.get().findById(Integer.parseInt(jobId)); - if (job != null) { - JobQueueStatus newStatus = getQueueStatus(job.getStatus(), status.getJobStatus()); - cloudVmStatusContainer.setStatus(newStatus); - if (newStatus != job.getStatus()) { - job.setStatus(newStatus); - new JobInstanceDao().saveOrUpdate(job); + + String jobId = status.getJobId(); + CloudVmStatusContainer cloudVmStatusContainer = jobMap.get(jobId); + if (cloudVmStatusContainer == null) { + cloudVmStatusContainer = new CloudVmStatusContainer(); + cloudVmStatusContainer.setJobId(jobId); + + jobMap.put(jobId, cloudVmStatusContainer); + JobInstance job = jobInstanceDao.get().findById(Integer.parseInt(jobId)); + if (job != null) { + JobQueueStatus newStatus = getQueueStatus(job.getStatus(), status.getJobStatus()); + cloudVmStatusContainer.setStatus(newStatus); + if (newStatus != job.getStatus()) { + job.setStatus(newStatus); + new JobInstanceDao().saveOrUpdate(job); + } + } else { + JobQueueStatus newStatus = getQueueStatus(cloudVmStatusContainer.getStatus(), status.getJobStatus()); + cloudVmStatusContainer.setStatus(newStatus); } - } else { - JobQueueStatus newStatus = getQueueStatus(cloudVmStatusContainer.getStatus(), status.getJobStatus()); - cloudVmStatusContainer.setStatus(newStatus); } - } - cloudVmStatusContainer.setReportTime(status.getReportTime()); - addStatusToJobContainer(status, cloudVmStatusContainer); - String projectId = getProjectForJobId(jobId); - if (projectId != null) { - ProjectStatusContainer projectStatusContainer = getProjectStatusContainer(projectId); - if (projectStatusContainer == null) { - projectStatusContainer = new ProjectStatusContainer(); - projectContainerMap.put(projectId, projectStatusContainer); + cloudVmStatusContainer.setReportTime(status.getReportTime()); + addStatusToJobContainer(status, cloudVmStatusContainer); + String projectId = getProjectForJobId(jobId); + if (projectId != null) { + ProjectStatusContainer projectStatusContainer = getProjectStatusContainer(projectId); + if (projectStatusContainer == null) { + projectStatusContainer = new ProjectStatusContainer(); + projectContainerMap.put(projectId, projectStatusContainer); + } + projectStatusContainer.addStatusContainer(cloudVmStatusContainer); } - projectStatusContainer.addStatusContainer(cloudVmStatusContainer); } + } finally { + AWSXRay.endSegment(); } - AWSXRay.endSegment(); } private String getProjectForJobId(String jobId) { @@ -228,13 +278,24 @@ private JobQueueStatus getQueueStatus(JobQueueStatus oldStatus, JobStatus jobSta return oldStatus; } - private boolean shouldUpdateStatus(CloudVmStatus currentStatus) { - if (currentStatus != null) { - VMStatus status = currentStatus.getVmStatus(); - return (status != VMStatus.shutting_down - && status != VMStatus.stopped - && status != VMStatus.stopping - && status != VMStatus.terminated); + /** + * Determines whether an incoming status update should be applied. + * Rejects updates when the current status is terminal, UNLESS the incoming + * status is a valid forward transition (e.g., stopping → terminated from killInstances). + */ + private boolean shouldUpdateStatus(CloudVmStatus currentStatus, CloudVmStatus incomingStatus) { + if (currentStatus == null) { + return true; + } + VMStatus currentVm = currentStatus.getVmStatus(); + // Already fully terminated — no further updates + if (currentVm == VMStatus.terminated) { + return false; + } + // For stopping/stopped/shutting_down, only allow forward transition to terminated/Completed + if (currentVm == VMStatus.shutting_down || currentVm == VMStatus.stopped || currentVm == VMStatus.stopping) { + return incomingStatus.getVmStatus() == VMStatus.terminated + || incomingStatus.getJobStatus() == JobStatus.Completed; } return true; } @@ -246,6 +307,7 @@ private boolean shouldUpdateStatus(CloudVmStatus currentStatus) { @Override public void removeStatusForInstance(String instanceId) { statusMap.remove(instanceId); + lastSeenMap.remove(instanceId); } /** @@ -303,6 +365,12 @@ public void stopJob(String id) { **/ private void addStatusToJobContainer(CloudVmStatus status, CloudVmStatusContainer cloudVmStatusContainer) { ControllerLoggingConfig.setupThreadContext(); + // Normalize completed/terminated agents to zero users — agent may report stale currentUsers + // if threads haven't fully exited before the final status report + if (status.getJobStatus() == JobStatus.Completed || status.getVmStatus() == VMStatus.terminated) { + status.setCurrentUsers(0); + status.setUserDetails(Collections.emptyList()); + } cloudVmStatusContainer.getStatuses().remove(status); cloudVmStatusContainer.getStatuses().add(status); cloudVmStatusContainer.calculateUserDetails(); @@ -356,11 +424,51 @@ private void addStatusToJobContainer(CloudVmStatus status, CloudVmStatusContaine return; } + // Staleness detection: if all non-replaced agents are either Completed or stale + // (no report in 3× the configured report interval), treat the job as finished. + // This catches the case where an agent's final Completed report was lost. + if (!isFinished && activeInstanceCount > 0) { + long reportIntervalMs = Math.max(new TankConfig().getAgentConfig().getStatusReportIntervalMilis(15_000), 15_000); + long staleThresholdMs = reportIntervalMs * 3; + Date now = new Date(); + long nowMs = now.getTime(); + boolean allCompletedOrStale = true; + for (CloudVmStatus s : statusesSnapshot) { + if (s.getVmStatus() == VMStatus.replaced) continue; + boolean isCompleted = s.getJobStatus() == JobStatus.Completed; + // Use lastSeenMap (set before executor enqueue) for staleness — immune to + // DiscardOldestPolicy dropping updates, which would leave reportTime stale + Long lastSeen = lastSeenMap.get(s.getInstanceId()); + boolean isStale = lastSeen != null && (nowMs - lastSeen) > staleThresholdMs; + if (!isCompleted && !isStale) { + allCompletedOrStale = false; + break; + } + } + if (allCompletedOrStale) { + LOG.warn("Job {} — all agents are either Completed or stale (no report in {}ms). Treating as finished.", + status.getJobId(), staleThresholdMs); + isFinished = true; + // Force stale agents to Completed in the container + for (CloudVmStatus s : statusesSnapshot) { + if (s.getVmStatus() == VMStatus.replaced) continue; + if (s.getJobStatus() != JobStatus.Completed) { + s.setJobStatus(JobStatus.Completed); + s.setCurrentUsers(0); + s.setUserDetails(Collections.emptyList()); + s.setEndTime(now); + } + } + // Recalculate aggregated user details after force-completing stale agents + cloudVmStatusContainer.calculateUserDetails(); + } + } + LOG.debug(new ObjectMessage(Map.of("Message", - "Status calc complete for job " + status.getJobId() + - " - isFinished=" + isFinished + ", paused=" + paused + + "Status calc complete for job " + status.getJobId() + + " - isFinished=" + isFinished + ", paused=" + paused + ", rampPaused=" + rampPaused + ", stopped=" + stopped + ", running=" + running))); - + if (isFinished) { LOG.info(new ObjectMessage(Map.of("Message","Setting end time on container " + cloudVmStatusContainer.getJobId()))); if (cloudVmStatusContainer.getEndTime() == null) { @@ -410,6 +518,73 @@ private void addStatusToJobContainer(CloudVmStatus status, CloudVmStatusContaine } } + /** + * Periodic sweep over all tracked jobs to detect stale agents when no inbound + * reports arrive (e.g., all agents died simultaneously). For each unfinished job + * (endTime == null), checks if all agents are Completed or stale and forces completion. + */ + private void sweepStaleJobs() { + long reportIntervalMs; + try { + reportIntervalMs = Math.max(new TankConfig().getAgentConfig().getStatusReportIntervalMilis(15_000), 15_000); + } catch (Exception e) { + LOG.error("Error reading config in staleness sweep, using default 15s: " + e.getMessage(), e); + reportIntervalMs = 15_000; + } + long staleThresholdMs = reportIntervalMs * 3; + Date now = new Date(); + long nowMs = now.getTime(); + + for (Map.Entry entry : jobMap.entrySet()) { + String jobId = entry.getKey(); + try { + CloudVmStatusContainer container = entry.getValue(); + if (container.getEndTime() != null) { + continue; // already finished + } + synchronized (getCacheSyncObject(jobId)) { + // Re-check after acquiring lock + if (container.getEndTime() != null) { + continue; + } + Set statuses = container.getStatuses(); + if (statuses.isEmpty()) { + continue; + } + boolean allCompletedOrStale = true; + CloudVmStatus latestStatus = null; + for (CloudVmStatus s : statuses) { + if (s.getVmStatus() == VMStatus.replaced) continue; + if (latestStatus == null) latestStatus = s; + boolean isCompleted = s.getJobStatus() == JobStatus.Completed; + Long lastSeen = lastSeenMap.get(s.getInstanceId()); + boolean isStale = lastSeen != null && (nowMs - lastSeen) > staleThresholdMs; + if (!isCompleted && !isStale) { + allCompletedOrStale = false; + break; + } + } + if (allCompletedOrStale && latestStatus != null) { + LOG.warn("Staleness sweep: job {} — all agents Completed or stale. Triggering finish via addStatusToJobContainer.", + jobId); + for (CloudVmStatus s : statuses) { + if (s.getVmStatus() == VMStatus.replaced) continue; + if (s.getJobStatus() != JobStatus.Completed) { + s.setJobStatus(JobStatus.Completed); + s.setCurrentUsers(0); + s.setUserDetails(Collections.emptyList()); + s.setEndTime(now); + } + } + addStatusToJobContainer(latestStatus, container); + } + } + } catch (Exception e) { + LOG.error("Error in staleness sweep for job " + jobId + ": " + e.getMessage(), e); + } + } + } + private Object getCacheSyncObject(final String id) { locks.putIfAbsent(id, id); return locks.get(id); @@ -418,5 +593,9 @@ private Object getCacheSyncObject(final String id) { @PreDestroy private void destroy() { EXECUTOR.shutdown(); + TERMINAL_EXECUTOR.shutdown(); + if (stalenessSweeper != null) { + stalenessSweeper.shutdown(); + } } } diff --git a/tank_vmManager/src/test/java/com/intuit/tank/vmManager/VMTrackerImplTest.java b/tank_vmManager/src/test/java/com/intuit/tank/vmManager/VMTrackerImplTest.java index 39dd09957..64a11d6a8 100644 --- a/tank_vmManager/src/test/java/com/intuit/tank/vmManager/VMTrackerImplTest.java +++ b/tank_vmManager/src/test/java/com/intuit/tank/vmManager/VMTrackerImplTest.java @@ -14,10 +14,12 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -25,7 +27,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.intuit.tank.dao.JobInstanceDao; +import com.intuit.tank.project.JobInstance; +import com.intuit.tank.vm.api.enumerated.JobQueueStatus; +import com.intuit.tank.vm.event.JobEvent; +import jakarta.enterprise.event.Event; +import jakarta.enterprise.inject.Instance; + import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; /** * Unit tests for VMTrackerImpl, focusing on the bug fix for terminated agents @@ -126,19 +136,24 @@ void getProjectStatusContainer_returnsNullForNonExistent() { /** * Use reflection to test the private shouldUpdateStatus method. + * Takes current status and incoming status (transition-aware guard). */ - private boolean invokeShouldUpdateStatus(CloudVmStatus status) throws Exception { - Method method = VMTrackerImpl.class.getDeclaredMethod("shouldUpdateStatus", CloudVmStatus.class); + private boolean invokeShouldUpdateStatus(CloudVmStatus currentStatus, CloudVmStatus incomingStatus) throws Exception { + Method method = VMTrackerImpl.class.getDeclaredMethod("shouldUpdateStatus", CloudVmStatus.class, CloudVmStatus.class); method.setAccessible(true); - return (boolean) method.invoke(vmTracker, status); + return (boolean) method.invoke(vmTracker, currentStatus, incomingStatus); } private CloudVmStatus createStatus(String instanceId, VMStatus vmStatus) { + return createStatusWithVmAndJob(instanceId, vmStatus, JobStatus.Starting); + } + + private CloudVmStatus createStatusWithVmAndJob(String instanceId, VMStatus vmStatus, JobStatus jobStatus) { return new CloudVmStatus( instanceId, "123", "sg-test", - JobStatus.Starting, + jobStatus, VMImageType.AGENT, VMRegion.US_WEST_2, vmStatus, @@ -153,37 +168,74 @@ private CloudVmStatus createStatus(String instanceId, VMStatus vmStatus) { @Test @DisplayName("shouldUpdateStatus returns true for null current status") void shouldUpdateStatus_nullCurrentStatus_returnsTrue() throws Exception { - assertTrue(invokeShouldUpdateStatus(null)); + CloudVmStatus incoming = createStatus("i-123", VMStatus.running); + assertTrue(invokeShouldUpdateStatus(null, incoming)); + } + + @Test + @DisplayName("shouldUpdateStatus rejects non-terminal incoming when current is terminated") + void shouldUpdateStatus_terminated_rejectsNonTerminalIncoming() throws Exception { + CloudVmStatus current = createStatus("i-123", VMStatus.terminated); + CloudVmStatus incoming = createStatus("i-123", VMStatus.running); + assertFalse(invokeShouldUpdateStatus(current, incoming), + "Should reject non-terminal incoming when current is terminated"); + } + + @Test + @DisplayName("shouldUpdateStatus rejects terminated→terminated (idempotent, already final)") + void shouldUpdateStatus_terminated_rejectsTerminatedIncoming() throws Exception { + CloudVmStatus current = createStatusWithVmAndJob("i-123", VMStatus.terminated, JobStatus.Completed); + CloudVmStatus incoming = createStatusWithVmAndJob("i-123", VMStatus.terminated, JobStatus.Completed); + assertFalse(invokeShouldUpdateStatus(current, incoming), + "Should reject terminated→terminated (already in final state)"); } @ParameterizedTest - @EnumSource(value = VMStatus.class, names = {"terminated", "stopped", "stopping", "shutting_down"}) - @DisplayName("shouldUpdateStatus returns false for terminal states") - void shouldUpdateStatus_terminalStates_returnsFalse(VMStatus terminalStatus) throws Exception { - CloudVmStatus status = createStatus("i-123", terminalStatus); - assertFalse(invokeShouldUpdateStatus(status), - "Should reject updates for terminal state: " + terminalStatus); + @EnumSource(value = VMStatus.class, names = {"stopped", "stopping", "shutting_down"}) + @DisplayName("shouldUpdateStatus rejects non-terminal incoming for stopping/stopped/shutting_down") + void shouldUpdateStatus_terminalStates_rejectNonTerminalIncoming(VMStatus terminalStatus) throws Exception { + CloudVmStatus current = createStatus("i-123", terminalStatus); + CloudVmStatus incoming = createStatus("i-123", VMStatus.running); + assertFalse(invokeShouldUpdateStatus(current, incoming), + "Should reject non-terminal incoming for state: " + terminalStatus); + } + + @ParameterizedTest + @EnumSource(value = VMStatus.class, names = {"stopped", "stopping", "shutting_down"}) + @DisplayName("shouldUpdateStatus allows terminated incoming for stopping/stopped/shutting_down (killInstances flow)") + void shouldUpdateStatus_terminalStates_allowTerminatedIncoming(VMStatus terminalStatus) throws Exception { + CloudVmStatus current = createStatus("i-123", terminalStatus); + CloudVmStatus incoming = createStatusWithVmAndJob("i-123", VMStatus.terminated, JobStatus.Completed); + assertTrue(invokeShouldUpdateStatus(current, incoming), + "Should allow terminated incoming for killInstances flow from state: " + terminalStatus); + } + + @ParameterizedTest + @EnumSource(value = VMStatus.class, names = {"stopped", "stopping", "shutting_down"}) + @DisplayName("shouldUpdateStatus allows Completed incoming for stopping/stopped/shutting_down") + void shouldUpdateStatus_terminalStates_allowCompletedIncoming(VMStatus terminalStatus) throws Exception { + CloudVmStatus current = createStatus("i-123", terminalStatus); + CloudVmStatus incoming = createStatusWithVmAndJob("i-123", VMStatus.running, JobStatus.Completed); + assertTrue(invokeShouldUpdateStatus(current, incoming), + "Should allow Completed job status incoming from state: " + terminalStatus); } @ParameterizedTest @EnumSource(value = VMStatus.class, names = {"unknown", "starting", "pending", "ready", "running", "rampPaused", "rebooting"}) @DisplayName("shouldUpdateStatus returns true for active states") void shouldUpdateStatus_activeStates_returnsTrue(VMStatus activeStatus) throws Exception { - CloudVmStatus status = createStatus("i-123", activeStatus); - assertTrue(invokeShouldUpdateStatus(status), + CloudVmStatus current = createStatus("i-123", activeStatus); + CloudVmStatus incoming = createStatus("i-123", VMStatus.running); + assertTrue(invokeShouldUpdateStatus(current, incoming), "Should allow updates for active state: " + activeStatus); } @Test @DisplayName("shouldUpdateStatus allows updates for replaced instances (protection handled in AgentWatchdog)") void shouldUpdateStatus_replacedInstance_allowsUpdate() throws Exception { - // Given: An instance that was replaced by AgentWatchdog CloudVmStatus replacedStatus = createStatus("i-replaced", VMStatus.replaced); - - // When/Then: Updates should be allowed - protection is handled by removing - // instances from tracking lists in AgentWatchdog before marking as replaced, - // NOT by blocking in shouldUpdateStatus (which would break killJobDirectly) - assertTrue(invokeShouldUpdateStatus(replacedStatus), + CloudVmStatus incoming = createStatus("i-replaced", VMStatus.running); + assertTrue(invokeShouldUpdateStatus(replacedStatus, incoming), "Replaced instances should allow updates (e.g., replaced -> terminated for killJobDirectly)"); } @@ -490,6 +542,238 @@ void concurrentRemoveStatusForJob_noException() throws Exception { } } + // ============ currentUsers normalization tests (Fix 2: completed/terminated → 0 users) ============ + + private CloudVmStatus createStatusWithUsers(String instanceId, String jobId, VMStatus vmStatus, + JobStatus jobStatus, int currentUsers) { + CloudVmStatus status = new CloudVmStatus( + instanceId, + jobId, + "sg-test", + jobStatus, + VMImageType.AGENT, + VMRegion.US_WEST_2, + vmStatus, + new ValidationStatus(), + 100, + currentUsers, + new Date(), + null + ); + status.setReportTime(new Date()); + return status; + } + + /** + * Inject mock CDI dependencies into vmTracker so addStatusToJobContainer can run. + * Returns a mock JobInstanceDao for test verification. + */ + @SuppressWarnings("unchecked") + private JobInstanceDao injectMockDependencies() throws Exception { + // Mock jobInstanceDao (Instance) + JobInstanceDao mockDao = mock(JobInstanceDao.class); + Instance mockDaoInstance = mock(Instance.class); + when(mockDaoInstance.get()).thenReturn(mockDao); + Field jobInstanceDaoField = VMTrackerImpl.class.getDeclaredField("jobInstanceDao"); + jobInstanceDaoField.setAccessible(true); + jobInstanceDaoField.set(vmTracker, mockDaoInstance); + + // Mock jobEventProducer (Event) + Event mockEventProducer = mock(Event.class); + Field jobEventField = VMTrackerImpl.class.getDeclaredField("jobEventProducer"); + jobEventField.setAccessible(true); + jobEventField.set(vmTracker, mockEventProducer); + + return mockDao; + } + + /** + * Set lastSeenMap entry for staleness detection tests. + */ + @SuppressWarnings("unchecked") + private void setLastSeen(String instanceId, long timestampMs) throws Exception { + Field lastSeenField = VMTrackerImpl.class.getDeclaredField("lastSeenMap"); + lastSeenField.setAccessible(true); + Map lastSeenMap = (Map) lastSeenField.get(vmTracker); + lastSeenMap.put(instanceId, timestampMs); + } + + /** + * Invoke addStatusToJobContainer via reflection. Requires a pre-existing container in jobMap. + */ + private void invokeAddStatusToJobContainer(CloudVmStatus status, CloudVmStatusContainer container) throws Exception { + Method method = VMTrackerImpl.class.getDeclaredMethod("addStatusToJobContainer", + CloudVmStatus.class, CloudVmStatusContainer.class); + method.setAccessible(true); + method.invoke(vmTracker, status, container); + } + + /** + * Create a container with pre-populated statuses, wire it into the vmTracker's jobMap, + * and inject mock CDI dependencies. Returns the mock JobInstanceDao. + */ + @SuppressWarnings("unchecked") + private JobInstanceDao createAndRegisterContainerWithMocks(String jobId, + CloudVmStatusContainer[] containerOut, CloudVmStatus... statuses) throws Exception { + JobInstanceDao mockDao = injectMockDependencies(); + + // Create a mock JobInstance for the DB lookup + JobInstance mockJob = mock(JobInstance.class); + when(mockJob.getStatus()).thenReturn(JobQueueStatus.Running); + when(mockJob.getId()).thenReturn(Integer.parseInt(jobId)); + when(mockDao.findById(Integer.parseInt(jobId))).thenReturn(mockJob); + when(mockDao.saveOrUpdate(any())).thenReturn(mockJob); + + java.lang.reflect.Field jobMapField = VMTrackerImpl.class.getDeclaredField("jobMap"); + jobMapField.setAccessible(true); + Map jobMap = + (Map) jobMapField.get(vmTracker); + + CloudVmStatusContainer container = new CloudVmStatusContainer(); + container.setJobId(jobId); + container.setStartTime(new Date()); + for (CloudVmStatus s : statuses) { + container.getStatuses().add(s); + } + jobMap.put(jobId, container); + containerOut[0] = container; + return mockDao; + } + + @Test + @DisplayName("addStatusToJobContainer normalizes currentUsers to 0 for Completed agent") + void addStatusToJobContainer_completedAgent_normalizesCurrentUsersToZero() throws Exception { + // Given: An agent reports Completed but still has currentUsers=10 (threads haven't exited) + String jobId = "100"; + CloudVmStatus completedWithUsers = createStatusWithUsers("i-agent1", jobId, + VMStatus.running, JobStatus.Completed, 10); + CloudVmStatusContainer[] containerOut = new CloudVmStatusContainer[1]; + createAndRegisterContainerWithMocks(jobId, containerOut); + + // When: addStatusToJobContainer processes this status + invokeAddStatusToJobContainer(completedWithUsers, containerOut[0]); + + // Then: currentUsers should be normalized to 0 + CloudVmStatus storedStatus = containerOut[0].getStatuses().stream() + .filter(s -> s.getInstanceId().equals("i-agent1")) + .findFirst().orElseThrow(); + assertEquals(0, storedStatus.getCurrentUsers(), + "Completed agent should have currentUsers normalized to 0"); + } + + @Test + @DisplayName("addStatusToJobContainer normalizes currentUsers to 0 for terminated agent") + void addStatusToJobContainer_terminatedAgent_normalizesCurrentUsersToZero() throws Exception { + // Given: An agent reports terminated but still has currentUsers=5 + String jobId = "101"; + CloudVmStatus terminatedWithUsers = createStatusWithUsers("i-agent1", jobId, + VMStatus.terminated, JobStatus.Completed, 5); + CloudVmStatusContainer[] containerOut = new CloudVmStatusContainer[1]; + createAndRegisterContainerWithMocks(jobId, containerOut); + + // When + invokeAddStatusToJobContainer(terminatedWithUsers, containerOut[0]); + + // Then + CloudVmStatus storedStatus = containerOut[0].getStatuses().stream() + .filter(s -> s.getInstanceId().equals("i-agent1")) + .findFirst().orElseThrow(); + assertEquals(0, storedStatus.getCurrentUsers(), + "Terminated agent should have currentUsers normalized to 0"); + } + + @Test + @DisplayName("addStatusToJobContainer does NOT normalize currentUsers for Running agent") + void addStatusToJobContainer_runningAgent_preservesCurrentUsers() throws Exception { + // Given: A Running agent with 50 currentUsers + String jobId = "102"; + CloudVmStatus runningStatus = createStatusWithUsers("i-agent1", jobId, + VMStatus.running, JobStatus.Running, 50); + CloudVmStatusContainer[] containerOut = new CloudVmStatusContainer[1]; + createAndRegisterContainerWithMocks(jobId, containerOut); + + // When + invokeAddStatusToJobContainer(runningStatus, containerOut[0]); + + // Then: currentUsers should remain 50 + CloudVmStatus storedStatus = containerOut[0].getStatuses().stream() + .filter(s -> s.getInstanceId().equals("i-agent1")) + .findFirst().orElseThrow(); + assertEquals(50, storedStatus.getCurrentUsers(), + "Running agent should keep its currentUsers unchanged"); + } + + // ============ Staleness detection tests (Fix 3: auto-complete stale agents) ============ + + @Test + @DisplayName("Stale agent (no report in 45s) with one Completed agent triggers isFinished") + void addStatusToJobContainer_staleAgent_triggersJobFinished() throws Exception { + // Given: Two agents — one Completed, one stale (reportTime 60s ago, still Running) + String jobId = "103"; + CloudVmStatus completedAgent = createStatusWithUsers("i-completed", jobId, + VMStatus.running, JobStatus.Completed, 0); + completedAgent.setReportTime(new Date()); + + CloudVmStatus staleAgent = createStatusWithUsers("i-stale", jobId, + VMStatus.running, JobStatus.Running, 10); + staleAgent.setReportTime(new Date(System.currentTimeMillis() - 120_000)); + + CloudVmStatusContainer[] containerOut = new CloudVmStatusContainer[1]; + createAndRegisterContainerWithMocks(jobId, containerOut, staleAgent); + + // Populate lastSeenMap — staleness uses this instead of reportTime + // Must exceed 3× configured report interval (settings.xml has 30s → threshold=90s) + setLastSeen("i-completed", System.currentTimeMillis()); + setLastSeen("i-stale", System.currentTimeMillis() - 120_000); // 120s ago + + // When: The completed agent reports + invokeAddStatusToJobContainer(completedAgent, containerOut[0]); + + // Then: Stale agent should be force-completed and job should have endTime set + CloudVmStatus staleInContainer = containerOut[0].getStatuses().stream() + .filter(s -> s.getInstanceId().equals("i-stale")) + .findFirst().orElseThrow(); + assertEquals(JobStatus.Completed, staleInContainer.getJobStatus(), + "Stale agent should be force-completed"); + assertEquals(0, staleInContainer.getCurrentUsers(), + "Stale agent should have currentUsers forced to 0"); + assertNotNull(containerOut[0].getEndTime(), + "Job container should have endTime set (job is finished)"); + } + + @Test + @DisplayName("Recently-reporting agent prevents staleness detection from firing") + void addStatusToJobContainer_recentAgent_noStalenessTriggered() throws Exception { + // Given: Two agents — one Completed, one still Running with recent report + String jobId = "104"; + CloudVmStatus completedAgent = createStatusWithUsers("i-completed", jobId, + VMStatus.running, JobStatus.Completed, 0); + completedAgent.setReportTime(new Date()); + + CloudVmStatus recentAgent = createStatusWithUsers("i-recent", jobId, + VMStatus.running, JobStatus.Running, 50); + recentAgent.setReportTime(new Date()); // just reported + + CloudVmStatusContainer[] containerOut = new CloudVmStatusContainer[1]; + createAndRegisterContainerWithMocks(jobId, containerOut, recentAgent); + + // Populate lastSeenMap — recent agent was just seen + setLastSeen("i-completed", System.currentTimeMillis()); + setLastSeen("i-recent", System.currentTimeMillis()); // just seen + + // When: The completed agent reports + invokeAddStatusToJobContainer(completedAgent, containerOut[0]); + + // Then: Recent agent should NOT be force-completed + CloudVmStatus recentInContainer = containerOut[0].getStatuses().stream() + .filter(s -> s.getInstanceId().equals("i-recent")) + .findFirst().orElseThrow(); + assertEquals(JobStatus.Running, recentInContainer.getJobStatus(), + "Recently-reporting agent should stay Running"); + assertNull(containerOut[0].getEndTime(), + "Job should NOT be finished — one agent is still actively running"); + } + // ============ removeStatusForInstance tests ============ @Test