Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@
import io.cdap.cdap.api.security.store.SecureStore;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.encryption.guice.UserCredentialAeadEncryptionModule;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.IOModule;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.guice.preview.PreviewDiscoveryRuntimeModule;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.common.utils.Networks;
import io.cdap.cdap.config.guice.ConfigStoreModule;
import io.cdap.cdap.data.runtime.DataSetServiceModules;
Expand All @@ -48,26 +50,31 @@
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
import io.cdap.cdap.data2.metadata.writer.MetadataServiceClient;
import io.cdap.cdap.data2.metadata.writer.NoOpMetadataServiceClient;
import io.cdap.cdap.internal.app.preview.PreviewRunnerService;
import io.cdap.cdap.internal.app.preview.PreviewRequestPollerInfoProvider;
import io.cdap.cdap.internal.app.preview.PreviewRunnerHttpHandlerInternal;
import io.cdap.cdap.internal.provision.ProvisionerModule;
import io.cdap.cdap.logging.appender.LogAppender;
import io.cdap.cdap.logging.appender.tms.PreviewTMSLogAppender;
import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule;
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule;
import io.cdap.cdap.security.guice.preview.PreviewSecureStoreModule;
import io.cdap.http.ChannelPipelineModifier;
import io.cdap.http.NettyHttpService;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContentDecompressor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Threads;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -79,31 +86,33 @@

private static final Logger LOG = LoggerFactory.getLogger(DefaultPreviewRunnerManager.class);

private final CConfiguration previewCConf;

Check warning on line 89 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewCConf' must contain no more than '1' consecutive capital letters.
private final Configuration previewHConf;

Check warning on line 90 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewHConf' must contain no more than '1' consecutive capital letters.
private final SConfiguration previewSConf;

Check warning on line 91 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewSConf' must contain no more than '1' consecutive capital letters.
private final int maxConcurrentPreviews;
private final DiscoveryServiceClient discoveryServiceClient;
private final DatasetFramework datasetFramework;
private final SecureStore secureStore;
private final TransactionSystemClient transactionSystemClient;
private final PreviewRunnerModule previewRunnerModule;
private final Set<PreviewRunnerService> previewRunnerServices;
private final LevelDBTableService previewLevelDBTableService;

Check warning on line 98 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewLevelDBTableService' must contain no more than '1' consecutive capital letters.
private final PreviewRunnerServiceFactory previewRunnerServiceFactory;
private final PreviewRequestPollerInfoProvider pollerInfoProvider;
private final DiscoveryService discoveryService;
private NettyHttpService previewRunnerHttpService;
private Cancellable cancelDiscovery;
private PreviewRunner runner;

@Inject
DefaultPreviewRunnerManager(@Named(PreviewConfigModule.PREVIEW_CCONF) CConfiguration previewCConf,

Check warning on line 106 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewCConf' must contain no more than '1' consecutive capital letters.
@Named(PreviewConfigModule.PREVIEW_HCONF) Configuration previewHConf,

Check warning on line 107 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewHConf' must contain no more than '1' consecutive capital letters.
@Named(PreviewConfigModule.PREVIEW_SCONF) SConfiguration previewSConf,

Check warning on line 108 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewSConf' must contain no more than '1' consecutive capital letters.
SecureStore secureStore,
DiscoveryServiceClient discoveryServiceClient,
@Named(DataSetsModules.BASE_DATASET_FRAMEWORK) DatasetFramework datasetFramework,
TransactionSystemClient transactionSystemClient,
PreviewRunnerModule previewRunnerModule,
@Named(PreviewConfigModule.PREVIEW_LEVEL_DB) LevelDBTableService previewLevelDBService,

Check warning on line 114 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'previewLevelDBService' must contain no more than '1' consecutive capital letters.
PreviewRunnerServiceFactory previewRunnerServiceFactory) {
PreviewRequestPollerInfoProvider pollerInfoProvider, DiscoveryService discoveryService) {
this.previewCConf = previewCConf;
this.previewHConf = previewHConf;
this.previewSConf = previewSConf;
Expand All @@ -112,10 +121,10 @@
this.discoveryServiceClient = discoveryServiceClient;
this.transactionSystemClient = transactionSystemClient;
this.maxConcurrentPreviews = previewCConf.getInt(Constants.Preview.POLLER_COUNT);
this.previewRunnerServices = ConcurrentHashMap.newKeySet();
this.previewRunnerModule = previewRunnerModule;
this.previewLevelDBTableService = previewLevelDBService;
this.previewRunnerServiceFactory = previewRunnerServiceFactory;
this.pollerInfoProvider = pollerInfoProvider;
this.discoveryService = discoveryService;
}

@Override
Expand All @@ -127,19 +136,42 @@
((Service) runner).startAndWait();
}

// Create and start the preview poller services.
for (int i = 0; i < maxConcurrentPreviews; i++) {
createPreviewRunnerService().startAndWait();
NettyHttpService.Builder builder = NettyHttpService.builder(
//TODO(sidhdirenge) : Use cConf for this host address and port.
Constants.Service.PREVIEW_RUNNER).setHost("0.0.0.0").setPort(44317)
.setExecThreadPoolSize(previewCConf.getInt(Constants.Preview.EXEC_THREADS))
.setBossThreadPoolSize(previewCConf.getInt(Constants.Preview.BOSS_THREADS))
.setWorkerThreadPoolSize(previewCConf.getInt(Constants.Preview.WORKER_THREADS))
.setChannelPipelineModifier(new ChannelPipelineModifier() {
@Override
public void modify(ChannelPipeline pipeline) {
pipeline.addAfter("compressor", "decompressor", new HttpContentDecompressor());
}
}).setHttpHandlers(
new PreviewRunnerHttpHandlerInternal(1, pollerInfoProvider, runner, this::stopPreview)
);

if (previewCConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) {
new HttpsEnabler().configureKeyStore(previewCConf, previewSConf).enable(builder);
}
this.previewRunnerHttpService = builder.build();

LOG.debug("Starting PreviewRunnerHttpService");
previewRunnerHttpService.start();
cancelDiscovery = discoveryService.register(ResolvingDiscoverable.of(
URIScheme.createDiscoverable(Constants.Service.PREVIEW_RUNNER, previewRunnerHttpService)));
LOG.debug("Starting PreviewRunnerHttpService has completed");
}

@Override
protected void shutDown() throws Exception {
// Should stop the polling service, hence individual preview runs, before stopping the top level preview runner.
previewRunnerServices.forEach(this::stopQuietly);
if (runner instanceof Service) {
stopQuietly((Service) runner);
}
if (previewRunnerHttpService != null) {
previewRunnerHttpService.stop(1, 2, TimeUnit.SECONDS);
}
cancelDiscovery.cancel();
}

private void stopQuietly(Service service) {
Expand All @@ -152,19 +184,27 @@

@Override
public void stop(ApplicationId preview) throws Exception {
PreviewRunnerService runnerService = previewRunnerServices.stream()
.filter(r -> r.getPreviewApplication().filter(preview::equals).isPresent())
.findFirst()
.orElse(null);

if (runnerService == null) {
throw new NotFoundException(
"Preview run cannot be stopped. Please try stopping again or start new preview run.");
}
// PreviewRunnerService runnerService = previewRunnerServices.stream()
// .filter(r -> r.getPreviewApplication().filter(preview::equals).isPresent())
// .findFirst()
// .orElse(null);
//
// if (runnerService == null) {
// throw new NotFoundException(
// "Preview run cannot be stopped. Please try stopping again or start new preview run.");
// }
//
// PreviewRunnerService newRunnerService = createPreviewRunnerService();
// runnerService.stopAndWait();
// newRunnerService.startAndWait();

Check warning on line 199 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/DefaultPreviewRunnerManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.indentation.CommentsIndentationCheck

Comment has incorrect indentation level 0, expected is 4, indentation should be the same level as line 201.
// TODO(sidhdirenge): Stop preview before killing the pod.
LOG.info("Stop called for preview {}", preview.getApplication());
Comment thread Fixed
stop();
}

PreviewRunnerService newRunnerService = createPreviewRunnerService();
runnerService.stopAndWait();
newRunnerService.startAndWait();
private void stopPreview(ProgramId program) {
LOG.info("Stop called for preview {}", program.getApplication());
stop();
}

/**
Expand Down Expand Up @@ -223,31 +263,4 @@
}
);
}

/**
* Creates a {@link PreviewRunnerService}. It will automatically added to and removed from the
* {@link #previewRunnerServices} set.
*/
private PreviewRunnerService createPreviewRunnerService() {
PreviewRunnerService previewRunnerService = previewRunnerServiceFactory.create(runner);

previewRunnerService.addListener(new ServiceListenerAdapter() {

@Override
public void terminated(State from) {
previewRunnerServices.remove(previewRunnerService);
if (previewRunnerServices.isEmpty()) {
try {
stop();
} catch (Exception e) {
// should not happen
LOG.error("Failed to shutdown the preview runner manager service.", e);
}
}
}
}, Threads.SAME_THREAD_EXECUTOR);

previewRunnerServices.add(previewRunnerService);
return previewRunnerService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.cdap.app.preview;

Check warning on line 16 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

'package' should be separated from previous line.

import com.google.gson.JsonElement;
import io.cdap.cdap.api.security.AccessException;
Expand Down Expand Up @@ -102,5 +102,6 @@
* @param pollerInfo information about the poller
* @return {@code PreviewRequest} if such request is available in the queue
*/
@Deprecated
Optional<PreviewRequest> poll(@Nullable byte[] pollerInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.cdap.gateway.handlers.CommonHandlers;
import io.cdap.cdap.gateway.handlers.preview.PreviewErrorClassificationHttpHandler;
import io.cdap.cdap.gateway.handlers.preview.PreviewHttpHandler;
import io.cdap.cdap.gateway.handlers.preview.PreviewHttpHandlerInternal;
import io.cdap.cdap.internal.app.preview.DefaultPreviewManager;
import io.cdap.cdap.internal.app.preview.DefaultPreviewRequestQueue;
import io.cdap.cdap.internal.app.preview.DistributedPreviewManager;
Expand Down Expand Up @@ -83,7 +82,6 @@ protected void configure() {
Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(binder(), HttpHandler.class);
handlerBinder.addBinding().to(PreviewHttpHandler.class);
handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class);
handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and preview runners need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface PreviewRequestQueue {
*/
Optional<PreviewRequest> poll(@Nullable byte[] pollerInfo);

Optional<PreviewRequest> poll();

/**
* Add a preview request in the queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
import io.cdap.cdap.common.runtime.RuntimeModule;
import io.cdap.cdap.data.runtime.DataSetsModules;
Expand All @@ -30,7 +29,6 @@
import io.cdap.cdap.internal.app.preview.DirectPreviewRequestFetcher;
import io.cdap.cdap.internal.app.preview.PreviewRequestFetcher;
import io.cdap.cdap.internal.app.preview.PreviewRunStopper;
import io.cdap.cdap.internal.app.preview.PreviewRunnerService;
import io.cdap.cdap.internal.app.preview.RemotePreviewRequestFetcher;

/**
Expand Down Expand Up @@ -64,10 +62,6 @@ protected void configure() {
expose(PreviewRunStopper.class);
bind(PreviewRunnerManager.class).to(DefaultPreviewRunnerManager.class);
expose(PreviewRunnerManager.class);

install(new FactoryModuleBuilder()
.implement(PreviewRunnerService.class, PreviewRunnerService.class)
.build(PreviewRunnerServiceFactory.class));
}
};
}
Expand All @@ -90,10 +84,6 @@ protected void configure() {
bind(DefaultPreviewRunnerManager.class).in(Scopes.SINGLETON);
bind(PreviewRunnerManager.class).to(DefaultPreviewRunnerManager.class);
expose(PreviewRunnerManager.class);

install(new FactoryModuleBuilder()
.implement(PreviewRunnerService.class, PreviewRunnerService.class)
.build(PreviewRunnerServiceFactory.class));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
/**
* Internal {@link HttpHandler} for Preview system.
*/
@Deprecated
@Singleton
@Path(Constants.Gateway.INTERNAL_API_VERSION_3 + "/previews")
public class PreviewHttpHandlerInternal extends AbstractHttpHandler {
Expand All @@ -49,7 +50,7 @@
this.previewManager = previewManager;
}

@POST

Check warning on line 53 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/preview/PreviewHttpHandlerInternal.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
@Path("/requests/pull")
public void poll(FullHttpRequest request, HttpResponder responder) {
byte[] pollerInfo = Bytes.toBytes(request.content().nioBuffer());
Expand Down
Loading
Loading