diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java index 14e04164e123..888131f6a46c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java @@ -30,15 +30,17 @@ import io.cdap.cdap.api.security.store.SecureStore; import io.cdap.cdap.app.guice.AuditLogWriterModule; import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule; -import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.SConfiguration; +import io.cdap.cdap.common.discovery.ResolvingDiscoverable; +import io.cdap.cdap.common.discovery.URIScheme; import io.cdap.cdap.common.encryption.guice.UserCredentialAeadEncryptionModule; import io.cdap.cdap.common.guice.ConfigModule; import io.cdap.cdap.common.guice.IOModule; import io.cdap.cdap.common.guice.RemoteAuthenticatorModules; import io.cdap.cdap.common.guice.preview.PreviewDiscoveryRuntimeModule; +import io.cdap.cdap.common.security.HttpsEnabler; import io.cdap.cdap.common.utils.Networks; import io.cdap.cdap.config.guice.ConfigStoreModule; import io.cdap.cdap.data.runtime.DataSetServiceModules; @@ -48,26 +50,30 @@ import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService; import io.cdap.cdap.data2.metadata.writer.MetadataServiceClient; import io.cdap.cdap.data2.metadata.writer.NoOpMetadataServiceClient; -import io.cdap.cdap.internal.app.preview.PreviewRunnerService; +import io.cdap.cdap.internal.app.preview.PreviewRequestPollerInfoProvider; +import io.cdap.cdap.internal.app.preview.PreviewRunnerHttpHandlerInternal; import io.cdap.cdap.internal.provision.ProvisionerModule; import io.cdap.cdap.logging.appender.LogAppender; import io.cdap.cdap.logging.appender.tms.PreviewTMSLogAppender; import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; import io.cdap.cdap.metadata.MetadataReaderWriterModules; import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule; -import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.security.auth.context.AuthenticationContextModules; import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule; import io.cdap.cdap.security.guice.preview.PreviewSecureStoreModule; +import io.cdap.http.ChannelPipelineModifier; +import io.cdap.http.NettyHttpService; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpContentDecompressor; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.TransactionSystemClient; -import org.apache.twill.common.Threads; +import org.apache.twill.common.Cancellable; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.discovery.DiscoveryServiceClient; -import org.apache.twill.internal.ServiceListenerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,15 +88,16 @@ public class DefaultPreviewRunnerManager extends AbstractIdleService implements private final CConfiguration previewCConf; private final Configuration previewHConf; private final SConfiguration previewSConf; - private final int maxConcurrentPreviews; private final DiscoveryServiceClient discoveryServiceClient; private final DatasetFramework datasetFramework; private final SecureStore secureStore; private final TransactionSystemClient transactionSystemClient; private final PreviewRunnerModule previewRunnerModule; - private final Set previewRunnerServices; private final LevelDBTableService previewLevelDBTableService; - private final PreviewRunnerServiceFactory previewRunnerServiceFactory; + private final PreviewRequestPollerInfoProvider pollerInfoProvider; + private final DiscoveryService discoveryService; + private NettyHttpService previewRunnerHttpService; + private Cancellable cancelDiscovery; private PreviewRunner runner; @Inject @@ -103,7 +110,7 @@ public class DefaultPreviewRunnerManager extends AbstractIdleService implements TransactionSystemClient transactionSystemClient, PreviewRunnerModule previewRunnerModule, @Named(PreviewConfigModule.PREVIEW_LEVEL_DB) LevelDBTableService previewLevelDBService, - PreviewRunnerServiceFactory previewRunnerServiceFactory) { + PreviewRequestPollerInfoProvider pollerInfoProvider, DiscoveryService discoveryService) { this.previewCConf = previewCConf; this.previewHConf = previewHConf; this.previewSConf = previewSConf; @@ -111,11 +118,10 @@ public class DefaultPreviewRunnerManager extends AbstractIdleService implements this.secureStore = secureStore; this.discoveryServiceClient = discoveryServiceClient; this.transactionSystemClient = transactionSystemClient; - this.maxConcurrentPreviews = previewCConf.getInt(Constants.Preview.POLLER_COUNT); - this.previewRunnerServices = ConcurrentHashMap.newKeySet(); this.previewRunnerModule = previewRunnerModule; this.previewLevelDBTableService = previewLevelDBService; - this.previewRunnerServiceFactory = previewRunnerServiceFactory; + this.pollerInfoProvider = pollerInfoProvider; + this.discoveryService = discoveryService; } @Override @@ -127,19 +133,43 @@ protected void startUp() throws Exception { ((Service) runner).startAndWait(); } - // Create and start the preview poller services. - for (int i = 0; i < maxConcurrentPreviews; i++) { - createPreviewRunnerService().startAndWait(); + NettyHttpService.Builder builder = NettyHttpService.builder( + //TODO(sidhdirenge) : Use cConf for this host address and port. + Constants.Service.PREVIEW_RUNNER).setHost("0.0.0.0").setPort(0) + .setExecThreadPoolSize(previewCConf.getInt(Constants.Preview.EXEC_THREADS)) + .setBossThreadPoolSize(previewCConf.getInt(Constants.Preview.BOSS_THREADS)) + .setWorkerThreadPoolSize(previewCConf.getInt(Constants.Preview.WORKER_THREADS)) + .setChannelPipelineModifier(new ChannelPipelineModifier() { + @Override + public void modify(ChannelPipeline pipeline) { + pipeline.addAfter("compressor", "decompressor", new HttpContentDecompressor()); + } + }).setHttpHandlers( + new PreviewRunnerHttpHandlerInternal(previewCConf, pollerInfoProvider, runner, + this::stopPreview) + ); + + if (previewCConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) { + new HttpsEnabler().configureKeyStore(previewCConf, previewSConf).enable(builder); } + this.previewRunnerHttpService = builder.build(); + + LOG.debug("Starting PreviewRunnerHttpService"); + previewRunnerHttpService.start(); + cancelDiscovery = discoveryService.register(ResolvingDiscoverable.of( + URIScheme.createDiscoverable(Constants.Service.PREVIEW_RUNNER, previewRunnerHttpService))); + LOG.debug("Starting PreviewRunnerHttpService has completed"); } @Override protected void shutDown() throws Exception { - // Should stop the polling service, hence individual preview runs, before stopping the top level preview runner. - previewRunnerServices.forEach(this::stopQuietly); if (runner instanceof Service) { stopQuietly((Service) runner); } + if (previewRunnerHttpService != null) { + previewRunnerHttpService.stop(1, 2, TimeUnit.SECONDS); + } + cancelDiscovery.cancel(); } private void stopQuietly(Service service) { @@ -151,20 +181,14 @@ private void stopQuietly(Service service) { } @Override - public void stop(ApplicationId preview) throws Exception { - PreviewRunnerService runnerService = previewRunnerServices.stream() - .filter(r -> r.getPreviewApplication().filter(preview::equals).isPresent()) - .findFirst() - .orElse(null); - - if (runnerService == null) { - throw new NotFoundException( - "Preview run cannot be stopped. Please try stopping again or start new preview run."); - } + public void stop(ProgramId preview) throws Exception { + LOG.info("sidhdirenge - Stop called for preview {}", preview.getParent()); + runner.stopPreview(preview); + } - PreviewRunnerService newRunnerService = createPreviewRunnerService(); - runnerService.stopAndWait(); - newRunnerService.startAndWait(); + private void stopPreview(ProgramId program) { + LOG.info("sidhdirenge - stopPreview called for preview {}", program.getApplication()); + stop(); } /** @@ -223,31 +247,4 @@ public InetAddress providesHostname(CConfiguration cConf) { } ); } - - /** - * Creates a {@link PreviewRunnerService}. It will automatically added to and removed from the - * {@link #previewRunnerServices} set. - */ - private PreviewRunnerService createPreviewRunnerService() { - PreviewRunnerService previewRunnerService = previewRunnerServiceFactory.create(runner); - - previewRunnerService.addListener(new ServiceListenerAdapter() { - - @Override - public void terminated(State from) { - previewRunnerServices.remove(previewRunnerService); - if (previewRunnerServices.isEmpty()) { - try { - stop(); - } catch (Exception e) { - // should not happen - LOG.error("Failed to shutdown the preview runner manager service.", e); - } - } - } - }, Threads.SAME_THREAD_EXECUTOR); - - previewRunnerServices.add(previewRunnerService); - return previewRunnerService; - } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManager.java index 8d99827f203a..38cb19b6eb3b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManager.java @@ -102,5 +102,6 @@ Map> getData(ApplicationId applicationId, String trace * @param pollerInfo information about the poller * @return {@code PreviewRequest} if such request is available in the queue */ + @Deprecated Optional poll(@Nullable byte[] pollerInfo); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java index c4bf1096aafb..e257777e25d3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManagerModule.java @@ -31,12 +31,12 @@ import io.cdap.cdap.gateway.handlers.CommonHandlers; import io.cdap.cdap.gateway.handlers.preview.PreviewErrorClassificationHttpHandler; import io.cdap.cdap.gateway.handlers.preview.PreviewHttpHandler; -import io.cdap.cdap.gateway.handlers.preview.PreviewHttpHandlerInternal; import io.cdap.cdap.internal.app.preview.DefaultPreviewManager; import io.cdap.cdap.internal.app.preview.DefaultPreviewRequestQueue; import io.cdap.cdap.internal.app.preview.DistributedPreviewManager; import io.cdap.cdap.internal.app.preview.DistributedPreviewRunStopper; import io.cdap.cdap.internal.app.preview.PreviewDataCleanupService; +import io.cdap.cdap.internal.app.preview.PreviewRequestPollerService; import io.cdap.cdap.internal.app.preview.PreviewRunStopper; import io.cdap.cdap.internal.app.store.preview.DefaultPreviewStore; import io.cdap.cdap.messaging.server.FetchHandler; @@ -72,6 +72,7 @@ protected void configure() { expose(PreviewRequestQueue.class); bind(PreviewDataCleanupService.class).in(Scopes.SINGLETON); + bind(PreviewRequestPollerService.class).in(Scopes.SINGLETON); if (distributedRunner) { bind(PreviewManager.class).to(DistributedPreviewManager.class).in(Scopes.SINGLETON); @@ -83,7 +84,6 @@ protected void configure() { Multibinder handlerBinder = Multibinder.newSetBinder(binder(), HttpHandler.class); handlerBinder.addBinding().to(PreviewHttpHandler.class); handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class); - handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class); if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) { // Add these handlers only if messaging service endpoint doesn't exist and preview runners need to diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRequestQueue.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRequestQueue.java index e1be35b9a7f7..7ca4dc77615c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRequestQueue.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRequestQueue.java @@ -37,6 +37,8 @@ public interface PreviewRequestQueue { */ Optional poll(@Nullable byte[] pollerInfo); + Optional poll(); + /** * Add a preview request in the queue. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerManagerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerManagerModule.java index 951a76aa4ee9..aa71260e078d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerManagerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerManagerModule.java @@ -19,7 +19,6 @@ import com.google.inject.Module; import com.google.inject.PrivateModule; import com.google.inject.Scopes; -import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.name.Names; import io.cdap.cdap.common.runtime.RuntimeModule; import io.cdap.cdap.data.runtime.DataSetsModules; @@ -29,8 +28,8 @@ import io.cdap.cdap.data2.dataset2.DefaultDatasetDefinitionRegistryFactory; import io.cdap.cdap.internal.app.preview.DirectPreviewRequestFetcher; import io.cdap.cdap.internal.app.preview.PreviewRequestFetcher; +import io.cdap.cdap.internal.app.preview.PreviewRequestPollerInfoProvider; import io.cdap.cdap.internal.app.preview.PreviewRunStopper; -import io.cdap.cdap.internal.app.preview.PreviewRunnerService; import io.cdap.cdap.internal.app.preview.RemotePreviewRequestFetcher; /** @@ -58,16 +57,13 @@ protected void configure() { bind(PreviewRequestFetcher.class).to(DirectPreviewRequestFetcher.class) .in(Scopes.SINGLETON); + bind(PreviewRequestPollerInfoProvider.class).toInstance(() -> new byte[0]); bind(DefaultPreviewRunnerManager.class).in(Scopes.SINGLETON); bind(PreviewRunStopper.class).to(DefaultPreviewRunnerManager.class); expose(PreviewRunStopper.class); bind(PreviewRunnerManager.class).to(DefaultPreviewRunnerManager.class); expose(PreviewRunnerManager.class); - - install(new FactoryModuleBuilder() - .implement(PreviewRunnerService.class, PreviewRunnerService.class) - .build(PreviewRunnerServiceFactory.class)); } }; } @@ -90,10 +86,6 @@ protected void configure() { bind(DefaultPreviewRunnerManager.class).in(Scopes.SINGLETON); bind(PreviewRunnerManager.class).to(DefaultPreviewRunnerManager.class); expose(PreviewRunnerManager.class); - - install(new FactoryModuleBuilder() - .implement(PreviewRunnerService.class, PreviewRunnerService.class) - .build(PreviewRunnerServiceFactory.class)); } }; } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewStatus.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewStatus.java index cea4ceca7be7..8d1c09ede5fe 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewStatus.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewStatus.java @@ -37,7 +37,8 @@ public enum Status { RUN_FAILED(true), KILLED(true), KILLED_BY_TIMER(true), - KILLED_BY_EXCEEDING_MEMORY_LIMIT(true); + KILLED_BY_EXCEEDING_MEMORY_LIMIT(true), + KILLED_BY_INSUFFICIENT_RESOURCES(true); private final boolean endState; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/preview/PreviewStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/preview/PreviewStore.java index 052b6df957b2..7dce63a58a4e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/preview/PreviewStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/preview/PreviewStore.java @@ -22,6 +22,7 @@ import io.cdap.cdap.common.ConflictException; import io.cdap.cdap.proto.artifact.AppRequest; import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.proto.security.Principal; import java.util.List; @@ -64,7 +65,11 @@ public interface PreviewStore { * * @param programRunId the program run id to save */ - void setProgramId(ProgramRunId programRunId); + void setProgramRunId(ProgramRunId programRunId); + + void setProgramId(ProgramId programId); + + ProgramId getProgramId(ApplicationId applicationId); /** * Get the program run id associated with the preview run diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/preview/PreviewHttpHandlerInternal.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/preview/PreviewHttpHandlerInternal.java index e35f453f2e7b..4c08028b59b4 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/preview/PreviewHttpHandlerInternal.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/preview/PreviewHttpHandlerInternal.java @@ -36,6 +36,7 @@ /** * Internal {@link HttpHandler} for Preview system. */ +@Deprecated @Singleton @Path(Constants.Gateway.INTERNAL_API_VERSION_3 + "/previews") public class PreviewHttpHandlerInternal extends AbstractHttpHandler { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewManager.java index aa7247db8820..db21e4b708e0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewManager.java @@ -142,6 +142,7 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe private final MessagingService messagingService; private final PreviewDataCleanupService previewDataCleanupService; private final MetricsCollectionService metricsCollectionService; + private final PreviewRequestPollerService previewRequestPollerService; private Injector previewInjector; private PreviewDataSubscriberService dataSubscriberService; private PreviewTMSLogSubscriber logSubscriberService; @@ -161,7 +162,8 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe PreviewRequestQueue previewRequestQueue, PreviewStore previewStore, PreviewRunStopper previewRunStopper, MessagingService messagingService, PreviewDataCleanupService previewDataCleanupService, - MetricsCollectionService metricsCollectionService) { + MetricsCollectionService metricsCollectionService, + PreviewRequestPollerService previewRequestPollerService) { this.authenticationContext = authenticationContext; this.previewCConf = previewCConf; this.previewHConf = previewHConf; @@ -178,6 +180,7 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe this.messagingService = messagingService; this.previewDataCleanupService = previewDataCleanupService; this.metricsCollectionService = metricsCollectionService; + this.previewRequestPollerService = previewRequestPollerService; } @Override @@ -196,10 +199,12 @@ protected void startUp() throws Exception { dataSubscriberService = previewInjector.getInstance(PreviewDataSubscriberService.class); dataSubscriberService.startAndWait(); previewDataCleanupService.startAndWait(); + previewRequestPollerService.startAndWait(); } @Override protected void shutDown() throws Exception { + stopQuietly(previewRequestPollerService); stopQuietly(previewDataCleanupService); stopQuietly(dataSubscriberService); stopQuietly(logSubscriberService); @@ -225,6 +230,8 @@ public ApplicationId start(@Name("namespaceId") NamespaceId namespace, AppReques } previewRequestQueue.add(previewRequest); + LOG.info("sidhdirenge - Setting program id {} and appId {}", programId, programId.getParent()); + previewStore.setProgramId(programId); return previewApp; } @@ -264,7 +271,9 @@ public void stopPreview(@Name("applicationId") ApplicationId applicationId) thro status.getSubmitTime(), null, null, null)); return; } - previewRunStopper.stop(applicationId); + ProgramId programId = previewStore.getProgramId(applicationId); + LOG.info("sidhdirenge - Got program Id {} for application {}", programId, applicationId); + previewRunStopper.stop(programId); } @Override diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueue.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueue.java index 674a1e75bcdd..e7b2c98f4421 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueue.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueue.java @@ -104,6 +104,22 @@ public Optional poll(@Nullable byte[] pollerInfo) { } } + @Override + public Optional poll() { + while (true) { + PreviewRequest previewRequest = requestQueue.poll(); + if (previewRequest == null) { + return Optional.empty(); + } + if (!isValid(previewRequest, waitTimeOut)) { + LOG.warn("Preview request wth application id {} is timed out. Ignoring it.", + previewRequest.getProgram().getParent()); + continue; + } + return Optional.of(previewRequest); + } + } + @Override public void add(PreviewRequest previewRequest) { previewStore.add(previewRequest.getProgram().getParent(), diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java index 08be41066108..3595e8aebf46 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java @@ -107,13 +107,12 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements PreviewRunStopper previewRunStopper, MessagingService messagingService, MetricsCollectionService metricsCollectionService, PreviewDataCleanupService previewDataCleanupService, - TwillRunner twillRunner) { + TwillRunner twillRunner, PreviewRequestPollerService previewRequestPollerService) { super(discoveryServiceClient, datasetFramework, transactionSystemClient, accessControllerInstantiator, accessEnforcer, authenticationContext, - previewLevelDbTableService, - previewCconf, previewHconf, previewSconf, previewRequestQueue, previewStore, - previewRunStopper, - messagingService, previewDataCleanupService, metricsCollectionService); + previewLevelDbTableService, previewCconf, previewHconf, previewSconf, previewRequestQueue, + previewStore, previewRunStopper, messagingService, previewDataCleanupService, + metricsCollectionService, previewRequestPollerService); this.cConf = cConf; this.hConf = hConf; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewRunStopper.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewRunStopper.java index 52ba1e472472..f173a8c78cfd 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewRunStopper.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewRunStopper.java @@ -21,7 +21,7 @@ import io.cdap.cdap.app.store.preview.PreviewStore; import io.cdap.cdap.internal.app.runtime.k8s.PreviewRequestPollerInfo; import io.cdap.cdap.master.spi.twill.ExtendedTwillController; -import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.Future; @@ -49,8 +49,8 @@ public class DistributedPreviewRunStopper implements PreviewRunStopper { } @Override - public void stop(ApplicationId previewApp) throws Exception { - byte[] info = previewStore.getPreviewRequestPollerInfo(previewApp); + public void stop(ProgramId previewApp) throws Exception { + byte[] info = previewStore.getPreviewRequestPollerInfo(previewApp.getParent()); if (info == null) { // should not happen throw new IllegalStateException( diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewDataSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewDataSubscriberService.java index 1cc348d26130..1e6bcb76d5ae 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewDataSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewDataSubscriberService.java @@ -283,7 +283,7 @@ public void processMessage(PreviewMessage message) { message, t); return; } - previewStore.setProgramId(payload); + previewStore.setProgramRunId(payload); } } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRequestPollerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRequestPollerService.java new file mode 100644 index 000000000000..750cc97cc806 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRequestPollerService.java @@ -0,0 +1,156 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.preview; + +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.gson.Gson; +import com.google.inject.Inject; +import io.cdap.cdap.api.retry.RetryableException; +import io.cdap.cdap.api.service.worker.RemoteExecutionException; +import io.cdap.cdap.app.preview.PreviewRequest; +import io.cdap.cdap.app.preview.PreviewRequestQueue; +import io.cdap.cdap.app.preview.PreviewStatus; +import io.cdap.cdap.app.store.preview.PreviewStore; +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.http.DefaultHttpRequestConfig; +import io.cdap.cdap.common.internal.remote.RemoteClient; +import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import io.cdap.cdap.common.service.Retries; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.common.service.RetryStrategy; +import io.cdap.cdap.proto.BasicThrowable; +import io.cdap.common.http.HttpMethod; +import io.cdap.common.http.HttpRequest; +import io.cdap.common.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.net.HttpURLConnection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.twill.common.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A scheduled service that polls a {@link PreviewRequestQueue} for new preview requests and pushes + * them to an available preview runner. + *

+ * This service takes care of pushing the preview requests to preview runner pods and handling the + * lifecycle and retry logic for each request. It uses a single-threaded executor to ensure that + * only one request is processed at a time. + */ +public class PreviewRequestPollerService extends AbstractScheduledService { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultPreviewManager.class); + + private static final Gson GSON = new Gson(); + private ScheduledExecutorService executor; + private final CConfiguration previewCConf; + private final PreviewRequestQueue previewRequestQueue; + private final RemoteClient previewRunnerClient; + private final RetryStrategy retryStrategy; + private final PreviewStore previewStore; + + @Inject + PreviewRequestPollerService(CConfiguration previewCConf, PreviewRequestQueue previewRequestQueue, + RemoteClientFactory remoteClientFactory, PreviewStore previewStore) { + this.previewCConf = previewCConf; + this.previewRequestQueue = previewRequestQueue; + this.previewStore = previewStore; + this.previewRunnerClient = remoteClientFactory.createRemoteClient( + Constants.Service.PREVIEW_RUNNER, new DefaultHttpRequestConfig(false), + Constants.Gateway.INTERNAL_API_VERSION_3); + this.retryStrategy = RetryStrategies.fromConfiguration(previewCConf, + Constants.Service.PREVIEW_RUNNER + "."); + } + + @Override + protected final ScheduledExecutorService executor() { + //TODO(sidhdirenge) : Check if we can use multiple threads here. + executor = Executors.newSingleThreadScheduledExecutor( + Threads.createDaemonThreadFactory("preview-poller")); + return executor; + } + + @Override + protected Scheduler scheduler() { + long pollDelayMillis = previewCConf.getLong(Constants.Preview.REQUEST_POLL_DELAY_MILLIS); + return Scheduler.newFixedRateSchedule(0, pollDelayMillis, TimeUnit.MILLISECONDS); + } + + @Override + protected void runOneIteration() throws Exception { + PreviewRequest previewRequest = previewRequestQueue.poll().orElse(null); + if (previewRequest == null) { + return; + } + // This try block prevents the service from crashing when a single request fails permanently after retries. + try { + Retries.callWithRetries(() -> { + HttpRequest.Builder requestBuilder = previewRunnerClient.requestBuilder(HttpMethod.POST, + "/" + Constants.Service.PREVIEW_RUNNER + "/run").withBody(GSON.toJson(previewRequest)); + HttpRequest httpRequest = requestBuilder.build(); + HttpResponse httpResponse = previewRunnerClient.execute(httpRequest); + + if (httpResponse.getResponseCode() == HttpResponseStatus.TOO_MANY_REQUESTS.code()) { + // Look for available runner pod. + throw new RetryableException( + String.format("Received response code %s for %s", httpResponse.getResponseCode(), + previewRequest)); + } + + if (httpResponse.getResponseCode() != HttpURLConnection.HTTP_OK) { + // This is a definitive failure, not a transient network issue. + // Throw a RuntimeException to break the retry loop immediately. + BasicThrowable basicThrowable = GSON.fromJson(httpResponse.getResponseBodyAsString(), + BasicThrowable.class); + throw new RuntimeException(RemoteExecutionException.fromBasicThrowable(basicThrowable)); + } + + byte[] pollerInfo = httpResponse.getResponseBody(); + previewStore.setPreviewRequestPollerInfo(previewRequest.getProgram().getParent(), + pollerInfo); + return null; + }, retryStrategy, throwable -> (throwable instanceof RetryableException)); + } catch (Exception e) { + // A single request failed permanently after exhausting all retries. + // Log the error and move on to the next iteration. + if (e instanceof RetryableException) { + //TODO(sidhdirenge) : Check if we need to add the request back to queue. + long submitTimeMillis = RunIds.getTime(previewRequest.getProgram().getApplication(), + TimeUnit.MILLISECONDS); + PreviewStatus status = new PreviewStatus( + PreviewStatus.Status.KILLED_BY_INSUFFICIENT_RESOURCES, submitTimeMillis, + new BasicThrowable(new Exception( + "Preview run failed possibly as no preview runners were available." + + "Please try running preview again.")), null, null); + previewStore.setPreviewStatus(previewRequest.getProgram().getParent(), status); + } else { + LOG.error("Failed to process preview request {} after all retries.", previewRequest, e); + } + } + } + + @Override + protected void shutDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunStopper.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunStopper.java index 01b2faea26fd..df13e1ef979e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunStopper.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunStopper.java @@ -16,7 +16,7 @@ package io.cdap.cdap.internal.app.preview; -import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ProgramId; /** * Interface to kill the preview runner service. @@ -29,5 +29,5 @@ public interface PreviewRunStopper { * @param preview id of the preview application to be stopped * @throws Exception if any error while stopping */ - void stop(ApplicationId preview) throws Exception; + void stop(ProgramId preview) throws Exception; } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerHttpHandlerInternal.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerHttpHandlerInternal.java new file mode 100644 index 000000000000..a3a15f9a4bf6 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerHttpHandlerInternal.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.preview; + +import com.google.gson.Gson; +import com.google.inject.Singleton; +import io.cdap.cdap.app.preview.PreviewRequest; +import io.cdap.cdap.app.preview.PreviewRunner; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.proto.BasicThrowable; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.security.spi.authentication.SecurityRequestContext; +import io.cdap.http.AbstractHttpHandler; +import io.cdap.http.HttpResponder; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.EmptyHttpHeaders; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.HttpHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An internal HTTP handler for the preview runner service. + *

+ * This handler receives requests to start a preview run, initiates the pipeline execution, and + * manages the lifecycle of the runner pod based on the completion of the preview. It is responsible + * for handling concurrent requests and signaling the runner to shut down when its work is + * complete. + */ +@Singleton +@Path(Constants.Gateway.INTERNAL_API_VERSION_3 + "/" + Constants.Service.PREVIEW_RUNNER) +public class PreviewRunnerHttpHandlerInternal extends AbstractHttpHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PreviewRunnerHttpHandlerInternal.class); + private static final Gson GSON = new Gson(); + private final AtomicInteger runningRequestCount = new AtomicInteger(0); + private final int concurrentRequestLimit; + private final int containerCount; + private final PreviewRequestPollerInfoProvider pollerInfoProvider; + private final PreviewRunner previewRunner; + private final Consumer previewCompletionConsumer; + + /** + * Constructs the handler with necessary dependencies. + */ + public PreviewRunnerHttpHandlerInternal(CConfiguration cConf, + PreviewRequestPollerInfoProvider pollerInfoProvider, PreviewRunner previewRunner, + Consumer stopper) { + this.concurrentRequestLimit = cConf.getInt(Constants.Preview.CONCURRENT_REQUEST_LIMIT); + this.containerCount = cConf.getInt(Constants.Preview.CONTAINER_COUNT); + this.pollerInfoProvider = pollerInfoProvider; + this.previewRunner = previewRunner; + // Restart the service to clean up and re-claim resources after user code + // execution. + this.previewCompletionConsumer = (previewApp) -> { + final int pendingRequests = runningRequestCount.decrementAndGet(); + // No need to kill the runner if preview execution will be running in the same process as the preview manager. + if (containerCount > 0 && pendingRequests == 0) { + stopper.accept(previewApp); + } + }; + } + + /** + * Handles a POST request to start a preview run. + */ + @POST + @Path("/run") + public void run(FullHttpRequest request, HttpResponder responder) { + if (runningRequestCount.incrementAndGet() > concurrentRequestLimit) { + responder.sendStatus(HttpResponseStatus.TOO_MANY_REQUESTS); + runningRequestCount.decrementAndGet(); + return; + } + PreviewRequest previewRequest = GSON.fromJson( + request.content().toString(StandardCharsets.UTF_8), PreviewRequest.class); + if (previewRequest != null && previewRequest.getPrincipal() != null) { + SecurityRequestContext.setUserId(previewRequest.getPrincipal().getName()); + SecurityRequestContext.setUserCredential(previewRequest.getPrincipal().getFullCredential()); + } + try { + LOG.info("Initiating preview for program {}", previewRequest.getProgram()); + + // This future completes AFTER the pipeline execution is done. + // So that consumer knows when to shut down runner pod. + CompletableFuture future = (CompletableFuture) previewRunner.startPreview( + previewRequest); + future.whenComplete((result, e) -> { + if (e != null) { + // Just log a debug if preview failed since it is expected for an application having execution failure + LOG.error("Pipeline execution failed for preview {}", previewRequest.getProgram(), e); + } + previewCompletionConsumer.accept(previewRequest.getProgram()); + }); + + byte[] pollerInfo = pollerInfoProvider.get(); + // Send an HTTP OK response immediately, but with an empty body or a simple status string. + // The client is now informed that the request was received and the preview started. + responder.sendByteArray(HttpResponseStatus.OK, pollerInfo, EmptyHttpHeaders.INSTANCE); + } catch (Exception e) { + LOG.error("Exception initiating preview for program {}", previewRequest.getProgram(), e); + runningRequestCount.decrementAndGet(); + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, exceptionToJson(e), + new DefaultHttpHeaders().set(HttpHeaders.CONTENT_TYPE, "application/json")); + } + } + + /** + * Return json representation of an exception. Used to propagate exception across network for + * better surfacing errors and debugging. + */ + private String exceptionToJson(Exception ex) { + BasicThrowable basicThrowable = new BasicThrowable(ex); + return GSON.toJson(basicThrowable); + } +} \ No newline at end of file diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerService.java index 7ff51c9e54e9..57192061ebbe 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/PreviewRunnerService.java @@ -45,6 +45,7 @@ /** * A scheduled service that periodically poll for new preview request and execute it. */ +@Deprecated public class PreviewRunnerService extends AbstractExecutionThreadService { private static final Logger LOG = LoggerFactory.getLogger(PreviewRunnerService.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStore.java index 2ae2e5f2074b..c6446235d75b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStore.java @@ -44,6 +44,7 @@ import io.cdap.cdap.proto.id.DatasetId; import io.cdap.cdap.proto.id.EntityId; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.proto.security.Principal; import java.io.IOException; @@ -71,6 +72,7 @@ public class DefaultPreviewStore implements PreviewStore { private static final byte[] RUN = Bytes.toBytes("r"); private static final byte[] STATUS = Bytes.toBytes("s"); private static final byte[] POLLERINFO = Bytes.toBytes("i"); + private static final byte[] PROGRAM_ID = Bytes.toBytes("pid"); /* * Row storing the preview requests waiting for execution * |------------------------------------|--------------------|-----------------| @@ -175,7 +177,7 @@ public void remove(ApplicationId applicationId) { } @Override - public void setProgramId(ProgramRunId programRunId) { + public void setProgramRunId(ProgramRunId programRunId) { // PreviewStore is a singleton and we have to create gson for each operation since gson is not thread safe. Gson gson = new GsonBuilder().registerTypeAdapter(EntityId.class, new EntityIdTypeAdapter()) .create(); @@ -190,6 +192,41 @@ public void setProgramId(ProgramRunId programRunId) { } } + @Override + public void setProgramId(ProgramId programId) { + + Gson gson = new GsonBuilder().registerTypeAdapter(EntityId.class, new EntityIdTypeAdapter()) + .create(); + MDSKey mdsKey = getPreviewRowKeyBuilder(META_ROW_KEY_PREFIX, + programId.getParent()).build(); + try { + previewTable.putDefaultVersion(mdsKey.getKey(), PROGRAM_ID, + Bytes.toBytes(gson.toJson(programId))); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to put %s into preview store", programId), + e); + } + } + + @Override + public ProgramId getProgramId(ApplicationId applicationId) { + Gson gson = new GsonBuilder().registerTypeAdapter(EntityId.class, new EntityIdTypeAdapter()) + .create(); + MDSKey mdsKey = getPreviewRowKeyBuilder(META_ROW_KEY_PREFIX, applicationId).build(); + + byte[] programId = null; + try { + programId = previewTable.getDefaultVersion(mdsKey.getKey(), PROGRAM_ID); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to get program id for preview %s", applicationId), e); + } + if (programId != null) { + return gson.fromJson(Bytes.toString(programId), ProgramId.class); + } + return null; + } + @Override public ProgramRunId getProgramRunId(ApplicationId applicationId) { // PreviewStore is a singleton and we have to create gson for each operation since gson is not thread safe. @@ -338,20 +375,20 @@ public void setPreviewRequestPollerInfo(ApplicationId applicationId, @Nullable b setPollerinfo(applicationId, pollerInfo); } removeFromWaitingState(applicationId); - PreviewStatus previewStatus = getPreviewStatus(applicationId); - if (previewStatus == null) { - throw new ConflictException( - String.format("Preview application with id %s does not exist.", applicationId)); - } - if (previewStatus.getStatus() != PreviewStatus.Status.WAITING) { - throw new ConflictException( - String.format("Preview application with id %s does not exist in the " - + "waiting state. Its current state is %s", applicationId, - previewStatus.getStatus().name())); - } - long submitTimeInMillis = RunIds.getTime(applicationId.getApplication(), TimeUnit.SECONDS); - setPreviewStatus(applicationId, - new PreviewStatus(PreviewStatus.Status.INIT, submitTimeInMillis, null, null, null)); +// PreviewStatus previewStatus = getPreviewStatus(applicationId); +// if (previewStatus == null) { +// throw new ConflictException( +// String.format("Preview application with id %s does not exist.", applicationId)); +// } +// if (previewStatus.getStatus() != PreviewStatus.Status.WAITING) { +// throw new ConflictException( +// String.format("Preview application with id %s does not exist in the " +// + "waiting state. Its current state is %s", applicationId, +// previewStatus.getStatus().name())); +// } +// long submitTimeInMillis = RunIds.getTime(applicationId.getApplication(), TimeUnit.SECONDS); +// setPreviewStatus(applicationId, +// new PreviewStatus(PreviewStatus.Status.INIT, submitTimeInMillis, null, null, null)); } private void setPollerinfo(ApplicationId applicationId, byte[] pollerInfo) { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueueTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueueTest.java index da9641eef21a..3e9ca4d04ce6 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueueTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/preview/DefaultPreviewRequestQueueTest.java @@ -101,7 +101,7 @@ public void remove(ApplicationId applicationId) { } @Override - public void setProgramId(ProgramRunId programRunId) { + public void setProgramRunId(ProgramRunId programRunId) { } @@ -111,6 +111,16 @@ public ProgramRunId getProgramRunId(ApplicationId applicationId) { return null; } + @Override + public void setProgramId(ProgramId programId) { + + } + + @Override + public ProgramId getProgramId(ApplicationId applicationId) { + return null; + } + @Override public void setPreviewStatus(ApplicationId applicationId, PreviewStatus previewStatus) { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStoreTest.java index 59a8c171372f..bbfde0485528 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/preview/DefaultPreviewStoreTest.java @@ -139,7 +139,7 @@ public void testPreviewInfo() throws IOException { RunIds.generate().getId()); PreviewStatus status = new PreviewStatus(PreviewStatus.Status.COMPLETED, System.currentTimeMillis(), null, 0L, System.currentTimeMillis()); - store.setProgramId(runId); + store.setProgramRunId(runId); store.setPreviewStatus(applicationId, status); Assert.assertEquals(runId, store.getProgramRunId(applicationId)); @@ -222,8 +222,10 @@ public void testPreviewTTL() throws Exception { store.add(secondApplicationId, testRequest, null); store.add(thirdApplicationId, testRequest, null); - // set poller info so that it gets removed from WAITING state store.setPreviewRequestPollerInfo(firstApplicationId, null); + store.setPreviewStatus(firstApplicationId, + new PreviewStatus(PreviewStatus.Status.INIT, System.currentTimeMillis(), null, 0L, + System.currentTimeMillis())); // put data for the first application store.put(firstApplicationId, "mytracer", "key1", "value1"); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 8a9daf5b3cf4..c23fa6c41b4f 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -157,6 +157,7 @@ public static final class Service { public static final String MASTER_SERVICES_BIND_ADDRESS = "master.services.bind.address"; public static final String MASTER_SERVICES_ANNOUNCE_ADDRESS = "master.services.announce.address"; public static final String PREVIEW_HTTP = "preview"; + public static final String PREVIEW_RUNNER = "preview.runner"; public static final String SECURE_STORE_SERVICE = "secure.store.service"; public static final String LOG_BUFFER_SERVICE = "log.buffer.service"; public static final String REMOTE_AGENT_SERVICE = "remote.agent.service"; @@ -468,6 +469,7 @@ public static final class Preview { public static final String CONTAINER_DISK_SIZE_GB = "preview.runner.container.disk.size.gb"; public static final String CONTAINER_MEMORY_MB = "preview.runner.container.memory.mb"; public static final String CONTAINER_CORES = "preview.runner.container.num.cores"; + public static final String CONCURRENT_REQUEST_LIMIT = "preview.runner.concurrent.request.limit"; public static final String CONTAINER_CPU_MULTIPLIER = "preview.runner.container.cpu.multiplier"; public static final String CONTAINER_MEMORY_MULTIPLIER = "preview.runner.container.memory.multiplier"; diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index b0c350958076..bfe39e5565b3 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -256,7 +256,10 @@ twill.jvm.gc.opts - -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M + -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails + -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 + -XX:GCLogFileSize=1M + (,9) @@ -3842,6 +3845,14 @@ + + preview.runner.concurrent.request.limit + 10 + + Number of max concurrent preview runs on a preview runner pod. + + + preview.runner.internal.router.enabled false @@ -4398,6 +4409,51 @@ + + + + preview.runner.retry.policy.base.delay.ms + 10 + + The base delay between retries in milliseconds + + + + + preview.runner.retry.policy.max.delay.ms + 2000 + + The maximum delay between retries in milliseconds + + + + + + preview.runner.retry.policy.max.retries + 2147483647 + + The maximum number of retries to attempt before aborting + + + + + + preview.runner.retry.policy.max.time.secs + 60 + + The maximum elapsed time in seconds before retries are aborted + + + + + preview.runner.retry.policy.type + exponential.backoff + + The type of retry policy for programs. Allowed options: + "none", "fixed.delay", or "exponential.backoff". + + +