Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private org.quartz.Scheduler getScheduler(JobStore store,
* {@link
* UnsupportedUGIProvider} will be used.
*/
private static final class UgiProviderProvider implements Provider<UGIProvider> {
public static final class UgiProviderProvider implements Provider<UGIProvider> {

private final Injector injector;
private final CConfiguration cConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,36 @@ public ProgramController dispatchProgram(ProgramRunDispatcherContext dispatcherC
throws Exception {
RunId runId = dispatcherContext.getRunId();
LOG.debug("Preparing to dispatch program run: {}", runId);
LOG.debug("dispatcherContext: {}", dispatcherContext.toString());
ProgramDescriptor programDescriptor = dispatcherContext.getProgramDescriptor();
LOG.debug("programDescriptor:{}", programDescriptor.toString());
ProgramOptions options = dispatcherContext.getProgramOptions();
LOG.debug("options:{}", options.toString());
boolean isDistributed = dispatcherContext.isDistributed();
LOG.debug("isDistributed:{}", isDistributed);
ProgramId programId = programDescriptor.getProgramId();
LOG.debug("programId:{}", programId);
ClusterMode clusterMode = ProgramRunners.getClusterMode(options);
LOG.debug("clusterMode:{}", clusterMode);
boolean tetheredRun = options.getArguments().hasOption(ProgramOptionConstants.PEER_NAME);
LOG.debug("tetheredRun:{}", tetheredRun);
ProgramRunnerFactory progRunnerFactory = programRunnerFactory;
if (clusterMode == ClusterMode.ISOLATED && !tetheredRun) {
progRunnerFactory = Optional.ofNullable(remoteProgramRunnerFactory)
.orElseThrow(UnsupportedOperationException::new);
}
ArtifactRepository artifactRepository = noAuthArtifactRepository;
LOG.debug("artifactRepository:{}", artifactRepository);
String peer = options.getArguments().getOption(ProgramOptionConstants.PEER_NAME);
LOG.debug("peer:{}", isDistributed);
if (peer != null) {
RemoteClient client = getRemoteClientForTetheredRun(peer);
RemoteArtifactRepositoryReader artifactRepositoryReader = new RemoteArtifactRepositoryReader(
locationFactory,
client);
artifactRepository = new RemoteArtifactRepository(cConf, artifactRepositoryReader);
}
LOG.debug("artifactRepository:{}", artifactRepository);

// Creates the ProgramRunner based on the cluster mode
ProgramRunner runner = progRunnerFactory.create(programId.getType());
Expand All @@ -219,11 +229,17 @@ public ProgramController dispatchProgram(ProgramRunDispatcherContext dispatcherC

// Get the artifact details and save it into the program options.
ArtifactId artifactId = programDescriptor.getArtifactId();
LOG.debug("artifactId:{}", artifactId.toString());
ArtifactDetail artifactDetail = getArtifactDetail(artifactId, artifactRepository);
LOG.debug("artifactDetail:{}", artifactDetail.toString());
ApplicationSpecification appSpec = programDescriptor.getApplicationSpecification();
LOG.debug("appSpec:{}", appSpec.toString());
ProgramDescriptor newProgramDescriptor = programDescriptor;
ProgramOptions updatedOptions = options;
if (tetheredRun) {
LOG.debug(
"artifactId: {}, programId: {}, options: {}, runId: {}, clusterMode: {}, artifactDetail: {}",
artifactId, programId, options, runId, clusterMode, artifactDetail);
updatedOptions = updateProgramOptions(artifactId, programId, options, runId, clusterMode,
Iterables.getFirst(artifactDetail.getMeta().getClasses().getApps(),
null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static MasterEnvironment create(CConfiguration cConf, String envName)
if (masterEnv == null) {
throw new NotFoundException("Master environment of name " + envName + " does not exist");
}

return masterEnv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public static final class Master {
*/
public static final class Service {

public static final String EXPORTER = "export";
public static final String IMPORTER = "import";
public static final String APP_FABRIC_HTTP = "appfabric";
public static final String APP_FABRIC_PROCESSOR = "appfabric.processor";
public static final String TRANSACTION = "transaction";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import io.cdap.cdap.proto.security.Principal;
import io.cdap.cdap.security.spi.authentication.AuthenticationContext;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class which sets internal authenticated headers for the remote client using an {@link
* AuthenticationContext} as the source.
*/
public class DefaultInternalAuthenticator implements InternalAuthenticator {
private static final Logger LOG = LoggerFactory.getLogger(DefaultInternalAuthenticator.class);

private final AuthenticationContext authenticationContext;

Expand All @@ -38,7 +41,9 @@ public DefaultInternalAuthenticator(AuthenticationContext authenticationContext)

@Override
public void applyInternalAuthenticationHeaders(BiConsumer<String, String> headerSetter) {
LOG.info("Inside DefaultInternalAuthenticator applyInternalAuthenticationHeaders");
Principal principal = authenticationContext.getPrincipal();
LOG.info("DefaultInternalAuthenticator applyInternalAuthenticationHeaders principal: {}", principal);
String userID = null;
Credential internalCredentials = null;
if (principal != null) {
Expand All @@ -53,5 +58,6 @@ public void applyInternalAuthenticationHeaders(BiConsumer<String, String> header
if (userID != null) {
headerSetter.accept(Constants.Security.Headers.USER_ID, userID);
}
LOG.info("DefaultInternalAuthenticator applyInternalAuthenticationHeaders headerSetter: {}", headerSetter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import javax.net.ssl.HttpsURLConnection;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Discovers a remote service and resolves URLs to that service.
Expand All @@ -66,6 +68,7 @@ public class RemoteClient {
private final String discoverableServiceName;
private final String basePath;
private final RemoteAuthenticator remoteAuthenticator;
private static final Logger LOG = LoggerFactory.getLogger(RemoteClient.class);

RemoteClient(InternalAuthenticator internalAuthenticator, DiscoveryServiceClient discoveryClient,
String discoverableServiceName, HttpRequestConfig httpRequestConfig, String basePath,
Expand Down Expand Up @@ -266,6 +269,7 @@ public URL resolve(String resource) {
}

URI uri = URIScheme.createURI(discoverable, "%s%s", basePath, resource);

try {
return rewriteUrl(uri.toURL());
} catch (MalformedURLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public RemoteClient createRemoteClient(String discoverableServiceName,
public RemoteClient createRemoteClient(String discoverableServiceName,
HttpRequestConfig httpRequestConfig, String basePath,
InternalAuthenticator internalAuthenticator) {
LOG.info("Inside createRemoteClient(), discoverableServiceName: {}, httpRequestConfig: {}, basePath:{}, internalAuthenticator:{}", discoverableServiceName, httpRequestConfig, basePath, internalAuthenticator);
basePath = basePath.startsWith("/") ? pathPrefix + basePath : pathPrefix + "/" + basePath;
LOG.info("internalRouterEnabled: {}", internalRouterEnabled);
if (this.internalRouterEnabled) {
return getClientForInternalRouter(discoverableServiceName,
httpRequestConfig, basePath, internalAuthenticator);
Expand All @@ -148,12 +150,13 @@ public RemoteClient createRemoteClient(String discoverableServiceName,
private RemoteClient getClientForInternalRouter(String destinationServiceName,
HttpRequestConfig httpRequestConfig, String basePath,
InternalAuthenticator internalAuthenticator) {
LOG.trace(
LOG.info(
"Creating client for service '{}' which routes through service '{}'.",
destinationServiceName, Service.INTERNAL_ROUTER);
String internalRouterPath = String.format("/%s/router/services/%s%s",
Constants.Gateway.INTERNAL_API_VERSION_3, destinationServiceName,
basePath);
LOG.info("internalRouterPath: {}", internalRouterPath);
return new RemoteClient(internalAuthenticator, discoveryClient,
Service.INTERNAL_ROUTER, httpRequestConfig, internalRouterPath,
remoteAuthenticator);
Expand Down
9 changes: 9 additions & 0 deletions cdap-export-import/Dockerfile.export
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Dockerfile located inside cdap-export-import/

FROM eclipse-temurin:8-jre-focal
WORKDIR /opt/app

# The path is now relative to the cdap-export-import directory
COPY target/cdap-export-import-*.jar app.jar

ENTRYPOINT ["java", "-jar", "app.jar"]
156 changes: 156 additions & 0 deletions cdap-export-import/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2025 Cask Data, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap</artifactId>
<version>6.12.0-SNAPSHOT</version>
</parent>

<artifactId>cdap-export-import</artifactId>
<name>CDAP Export-Import</name>
<packaging>jar</packaging>


<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.32.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.cdap.cdap</groupId>-->
<!-- <artifactId>cdap-storage-spi</artifactId>-->
<!-- <version>6.12.0-SNAPSHOT</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.cdap.cdap</groupId>-->
<!-- <artifactId>cdap-proto</artifactId>-->
<!-- <version>6.12.0-SNAPSHOT</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-fabric</artifactId>
<version>6.12.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- THIS PLUGIN CREATES THE SELF-CONTAINED, SHADED JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>cdap.shaded.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.api</pattern>
<shadedPattern>cdap.shaded.com.google.api</shadedPattern>
</relocation>
<!-- Add other relocations as needed -->
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>3.2.4</version> <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <transformers>-->
<!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">-->
<!-- <mainClass>io.cdap.cdap.ExportJobMain</mainClass>-->
<!-- </transformer>-->
<!-- </transformers>-->
<!-- <filters>-->
<!-- <filter>-->
<!-- <artifact>*:*</artifact>-->
<!-- <excludes>-->
<!-- <exclude>META-INF/*.SF</exclude>-->
<!-- <exclude>META-INF/*.DSA</exclude>-->
<!-- <exclude>META-INF/*.RSA</exclude>-->
<!-- </excludes>-->
<!-- </filter>-->
<!-- </filters>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
</project>
Loading