diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 12b192cf3b1b..e793e633da09 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -12,7 +12,7 @@ Please see SECURITY.md for reporting security vulnerabilities. Creating Issues =============== -In order to file bugs or new feature requests, please use http://issues.cask.co. +In order to file bugs or new feature requests, please use https://cdap.atlassian.net/jira/ Feature Requests ================ diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillController.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillController.java index 906197359e01..1c676e85d1cc 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillController.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillController.java @@ -140,10 +140,15 @@ public void complete() { } catch (Exception e) { // If there is exception, use the remote execution controller to try killing the remote process try { - LOG.debug("Force termination of remote process for program run {}", programRunId); - remoteProcessController.kill(RuntimeJobStatus.RUNNING); + RuntimeJobStatus currentStatus = remoteProcessController.getStatus(); + if (currentStatus == RuntimeJobStatus.RUNNING || currentStatus == RuntimeJobStatus.STARTING) { + LOG.debug("Force termination of remote process for program run {} as it is in state {}", programRunId, currentStatus); + remoteProcessController.kill(currentStatus); + } else { + LOG.debug("Skipping termination of remote process for program run {} as it is already in terminal state {}", programRunId, currentStatus); + } } catch (Exception ex) { - LOG.warn("Failed to terminate remote process for program run {}", programRunId, ex); + LOG.warn("Failed to get status or terminate remote process for program run {} during force kill attempt", programRunId, ex); } } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillControllerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillControllerTest.java new file mode 100644 index 000000000000..196dfe4059d9 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillControllerTest.java @@ -0,0 +1,102 @@ +package io.cdap.cdap.internal.app.runtime.distributed.remote; + +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobStatus; +import org.apache.twill.api.TwillController; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link RemoteExecutionTwillController}. + */ +public class RemoteExecutionTwillControllerTest { + + private CConfiguration cConf; + private ProgramRunId programRunId; + private RemoteProcessController remoteProcessController; + private ScheduledExecutorService scheduler; + private RemoteExecutionService remoteExecutionService; + private CompletableFuture startupCompletionFuture; + + @Before + public void setUp() { + cConf = CConfiguration.create(); + cConf.setLong(Constants.RuntimeMonitor.POLL_TIME_MS, 100); + programRunId = new ProgramRunId(NamespaceId.DEFAULT.getNamespace(), "testapp", ProgramType.WORKFLOW, "testworkflow", "testrun"); + remoteProcessController = Mockito.mock(RemoteProcessController.class); + scheduler = Executors.newSingleThreadScheduledExecutor(); + remoteExecutionService = Mockito.mock(RemoteExecutionService.class); + startupCompletionFuture = new CompletableFuture<>(); + } + + private RemoteExecutionTwillController createController(boolean terminateWithController) { + return new RemoteExecutionTwillController(cConf, programRunId, startupCompletionFuture, + remoteProcessController, scheduler, remoteExecutionService, + terminateWithController); + } + + @Test + public void testComplete_KillSkippedForTerminalState() throws Exception { + RemoteExecutionTwillController controller = createController(true); + startupCompletionFuture.complete(null); + + // Simulate getStatus throwing an exception to enter the catch block + when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure")); + + // In the catch block, simulate the job being COMPLETED + when(remoteProcessController.getStatus()).thenReturn(RuntimeJobStatus.COMPLETED); + + controller.complete(); + + // Verify kill was NOT called because the status was terminal + verify(remoteProcessController, never()).kill(any()); + } + + @Test + public void testComplete_KillCalledForRunningState() throws Exception { + RemoteExecutionTwillController controller = createController(true); + startupCompletionFuture.complete(null); + + // Simulate getStatus throwing an exception to enter the catch block + when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure")); + // In the catch block, simulate the job being RUNNING + when(remoteProcessController.getStatus()).thenReturn(RuntimeJobStatus.RUNNING); + + controller.complete(); + + // Verify kill WAS called + verify(remoteProcessController).kill(RuntimeJobStatus.RUNNING); + } + + @Test + public void testComplete_StatusCheckFailsInCatch() throws Exception { + RemoteExecutionTwillController controller = createController(true); + startupCompletionFuture.complete(null); + + // Simulate getStatus throwing an exception to enter the catch block + when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure")) + .thenThrow(new RuntimeException("Simulated getStatus failure in catch")); + + controller.complete(); + + // Verify kill was NOT called because getStatus failed in the catch + verify(remoteProcessController, never()).kill(any()); + } +} diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java index c4573be1934a..ae5b8b31a193 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java @@ -127,10 +127,19 @@ public final ClusterStatus deleteClusterWithStatus(ProvisionerContext context, C // Status details is specific to dataproc jobs, so it was not added to RuntimeJobDetail spi. String statusDetails = ((DataprocRuntimeJobDetail) jobDetail).getJobStatusDetails(); if (statusDetails != null) { - ProgramRunFailureException e = new ProgramRunFailureException( - String.format("Dataproc job '%s' status details: %s", - ((DataprocRuntimeJobDetail) jobDetail).getJobId(), statusDetails)); - LOG.error("Dataproc Job {}", jobDetail.getStatus(), e); + // Check if the failure is due to attempting to cancel a job already DONE + if (jobDetail.getStatus() == RuntimeJobStatus.FAILED && statusDetails.contains("is not supported in the current state: DONE")) { + LOG.warn("Attempted to cancel Dataproc job {} which was already DONE. This is not a pipeline failure. Continuing with deprovisioning.", ((DataprocRuntimeJobDetail) jobDetail).getJobId()); + } else { + ProgramRunFailureException e = new ProgramRunFailureException( + String.format("Dataproc job '%s' status details: %s", + ((DataprocRuntimeJobDetail) jobDetail).getJobId(), statusDetails)); + LOG.error("Dataproc Job {} failed with error: {}", jobDetail.getStatus(), statusDetails, e); + // Rethrow only if it's not the specific Cancel-on-DONE issue + // We need to be careful here, as this method is expected to return ClusterStatus.DELETING + // and an exception here might halt the deprovisioning process in the caller. + // For now, let's just log and not throw for the specific case. + } } } } finally { diff --git a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java index b508ede63689..9a4c8d05da35 100644 --- a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java +++ b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java @@ -494,4 +494,64 @@ public void testCommonDataprocLabelsInvalidLabel() { Map labels = provisioner.getCommonDataprocLabels(context); Assert.assertNull(labels.get("email")); } + + @Test + public void testDeleteClusterWithStatus_CancelOnDoneJob() throws Exception { + context.addProperty("accountKey", "testKey"); + context.addProperty(DataprocConf.PROJECT_ID_KEY, "testProject"); + context.addProperty("region", "testRegion"); + context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING)); + + DataprocRuntimeJobManager jobManager = Mockito.mock(DataprocRuntimeJobManager.class); + DataprocRuntimeJobDetail jobDetail = Mockito.mock(DataprocRuntimeJobDetail.class); + + Mockito.when(provisioner.getRuntimeJobManager(context)).thenReturn(Optional.of(jobManager)); + Mockito.when(jobManager.getDetail(context.getProgramRunInfo())).thenReturn(Optional.of(jobDetail)); + Mockito.when(jobDetail.getStatus()).thenReturn(RuntimeJobStatus.FAILED); + Mockito.when(jobDetail.getJobId()).thenReturn("test-job-id"); + String cancelErrorMessage = "Cancellation for task: Task(ofr-2hc-battery-management-dev/test-cluster-uuid/job-test-job-id) is not supported in the current state: DONE."; + Mockito.when(jobDetail.getJobStatusDetails()).thenReturn(cancelErrorMessage); + + Mockito.when(cluster.getName()).thenReturn("testClusterName"); + DataprocConf conf = DataprocConf.create(provisioner.createContextProperties(context)); + + // Simulate doDeleteCluster to avoid actual deletion call + Mockito.doNothing().when(provisioner).doDeleteCluster(context, cluster, conf); + + ClusterStatus status = provisioner.deleteClusterWithStatus(context, cluster); + + Assert.assertEquals(ClusterStatus.DELETING, status); + // N.B. We are not asserting on logs here, just that no exception is thrown + // and the status is DELETING. + } + + @Test + public void testDeleteClusterWithStatus_OtherJobFailure() throws Exception { + context.addProperty("accountKey", "testKey"); + context.addProperty(DataprocConf.PROJECT_ID_KEY, "testProject"); + context.addProperty("region", "testRegion"); + context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING)); + + DataprocRuntimeJobManager jobManager = Mockito.mock(DataprocRuntimeJobManager.class); + DataprocRuntimeJobDetail jobDetail = Mockito.mock(DataprocRuntimeJobDetail.class); + + Mockito.when(provisioner.getRuntimeJobManager(context)).thenReturn(Optional.of(jobManager)); + Mockito.when(jobManager.getDetail(context.getProgramRunInfo())).thenReturn(Optional.of(jobDetail)); + Mockito.when(jobDetail.getStatus()).thenReturn(RuntimeJobStatus.FAILED); + Mockito.when(jobDetail.getJobId()).thenReturn("test-job-id"); + String otherErrorMessage = "Some other fatal error"; + Mockito.when(jobDetail.getJobStatusDetails()).thenReturn(otherErrorMessage); + + Mockito.when(cluster.getName()).thenReturn("testClusterName"); + DataprocConf conf = DataprocConf.create(provisioner.createContextProperties(context)); + + // Simulate doDeleteCluster to avoid actual deletion call + Mockito.doNothing().when(provisioner).doDeleteCluster(context, cluster, conf); + + ClusterStatus status = provisioner.deleteClusterWithStatus(context, cluster); + Assert.assertEquals(ClusterStatus.DELETING, status); + // In this case, the error is not the specific one we are suppressing, so the LOG.error in the original code would be called. + // We are not re-throwing, so the status remains DELETING. + } } +