Skip to content
Merged
8 changes: 8 additions & 0 deletions src/main/java/io/cryostat/ConfigProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class ConfigProperties {
"cryostat.discovery.plugins.ping-period";
public static final String DISCOVERY_PLUGINS_MAX_BACKOFF_MULTIPLIER =
"cryostat.discovery.plugins.max-backoff-multiplier";
public static final String DISCOVERY_PLUGINS_PING_WORKER_POOL_SIZE =
"cryostat.discovery.plugins.ping.worker-pool-size";
public static final String DISCOVERY_PLUGINS_PING_DELAY_MS =
"cryostat.discovery.plugins.ping.delay-ms";
public static final String DISCOVERY_PLUGINS_PING_TIMEOUT_MS =
"cryostat.discovery.plugins.ping.timeout-ms";
public static final String DISCOVERY_PLUGINS_PING_STARTUP_GRACE_PERIOD_MS =
"cryostat.discovery.plugins.ping.startup-grace-period-ms";

public static final String CONNECTIONS_TTL = "cryostat.connections.ttl";
public static final String CONNECTIONS_FAILED_BACKOFF = "cryostat.connections.failed-backoff";
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/cryostat/ExceptionMappers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.nimbusds.jwt.proc.BadJWTException;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.smallrye.faulttolerance.api.RateLimitException;
import io.smallrye.mutiny.TimeoutException;
import jakarta.inject.Inject;
import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -143,4 +144,10 @@ public RestResponse<Void> mapBulkheadException(BulkheadException ex) {
logger.warn(ex);
return RestResponse.status(HttpResponseStatus.TOO_MANY_REQUESTS.code());
}

@ServerExceptionMapper
public RestResponse<Void> mapRateLimitException(RateLimitException ex) {
logger.warn(ex);
return RestResponse.status(HttpResponseStatus.TOO_MANY_REQUESTS.code());
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/cryostat/credentials/Credentials.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -36,10 +37,12 @@
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.faulttolerance.api.RateLimit;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.persistence.PersistenceException;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
Expand All @@ -50,6 +53,8 @@
import jakarta.ws.rs.core.UriInfo;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
import org.eclipse.microprofile.openapi.annotations.Operation;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.RestForm;
Expand Down Expand Up @@ -194,6 +199,10 @@ public CredentialMatchResult get(@RestPath long id) {
}

@Transactional
@Bulkhead
@Timeout
@Retry(retryOn = {SQLException.class, PersistenceException.class})
@RateLimit
@POST
@RolesAllowed("write")
@Operation(
Expand Down
85 changes: 35 additions & 50 deletions src/main/java/io/cryostat/discovery/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.faulttolerance.api.RateLimit;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.mutiny.core.eventbus.EventBus;
Expand Down Expand Up @@ -83,6 +83,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -113,7 +115,6 @@ public class Discovery {
static final String X_FORWARDED_FOR = "X-Forwarded-For";

private static final String JOB_PERIODIC = "discovery.periodic";
private static final String JOB_STARTUP = "discovery.startup";
private static final String PLUGIN_ID_MAP_KEY = "pluginId";
private static final String REFRESH_MAP_KEY = "refresh";

Expand All @@ -138,52 +139,6 @@ public class Discovery {
@Inject PluginCallbackFactory callbackFactory;
@Inject EntityManager entityManager;

void onStart(@Observes StartupEvent evt) {
QuarkusTransaction.requiringNew()
.run(
() -> {
DiscoveryPlugin.<DiscoveryPlugin>findAll().list().stream()
.filter(p -> !p.builtin)
.forEach(
plugin -> {
var dataMap = new JobDataMap();
dataMap.put(PLUGIN_ID_MAP_KEY, plugin.id);
dataMap.put(REFRESH_MAP_KEY, true);
JobDetail jobDetail =
JobBuilder.newJob(RefreshPluginJob.class)
.withIdentity(
plugin.id.toString(),
JOB_STARTUP)
.usingJobData(dataMap)
.build();
var trigger =
TriggerBuilder.newTrigger()
.withIdentity(
jobDetail
.getKey()
.getName(),
jobDetail
.getKey()
.getGroup())
.startNow()
.withSchedule(
SimpleScheduleBuilder
.simpleSchedule()
.withRepeatCount(0))
.build();
try {
if (!scheduler.checkExists(trigger.getKey())) {
scheduler.scheduleJob(jobDetail, trigger);
}
} catch (SchedulerException e) {
logger.warn(
"Failed to schedule plugin prune job",
e);
}
});
});
}

void onStop(@Observes ShutdownEvent evt) throws SchedulerException {
scheduler.shutdown();
}
Expand Down Expand Up @@ -246,6 +201,9 @@ public RestResponse<Credential> checkCredentialExists(@RestForm String script) {
}

@Bulkhead
@Timeout
@Retry(retryOn = {OptimisticLockException.class})
@RateLimit
@Blocking
@POST
@Path("/api/v4/discovery")
Expand Down Expand Up @@ -454,7 +412,7 @@ public PluginRegistration register(@Context RoutingContext ctx, JsonObject body)
throw new BadRequestException(e);
}

isNewPlugin = !scheduler.checkExists(JobKey.jobKey(plugin.id.toString(), JOB_PERIODIC));
isNewPlugin = !scheduler.checkExists(getPeriodicJobKey(plugin));
if (isNewPlugin) {
var dataMap = new JobDataMap();
dataMap.put(PLUGIN_ID_MAP_KEY, plugin.id);
Expand Down Expand Up @@ -499,6 +457,10 @@ public PluginRegistration register(@Context RoutingContext ctx, JsonObject body)
}

@Transactional
@Bulkhead
@Timeout
@Retry(retryOn = {OptimisticLockException.class})
@RateLimit
@POST
@Path("/api/v4/discovery/{id}")
@Consumes(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -527,6 +489,10 @@ public void publish(
}

@Transactional
@Bulkhead
@Timeout
@Retry(retryOn = {OptimisticLockException.class})
@RateLimit
@POST
@Path("/api/v4.2/discovery/{id}")
@Consumes(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -680,7 +646,6 @@ public void deregister(

Set<JobKey> jobKeys = new HashSet<>();
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_PERIODIC)));
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_STARTUP)));
for (var key : jobKeys) {
if (!Objects.equals(plugin.id.toString(), key.getName())) {
continue;
Expand Down Expand Up @@ -1054,6 +1019,26 @@ static String requireNonBlank(String in, String name) {
return in;
}

/**
* Create a JobKey for a plugin's periodic refresh job.
*
* @param plugin The plugin to create a JobKey for
* @return JobKey for the plugin's periodic refresh job
*/
static JobKey getPeriodicJobKey(DiscoveryPlugin plugin) {
return getPeriodicJobKey(plugin.id);
}

/**
* Create a JobKey for a plugin's periodic refresh job using the plugin ID.
*
* @param pluginId The plugin ID to create a JobKey for
* @return JobKey for the plugin's periodic refresh job
*/
static JobKey getPeriodicJobKey(UUID pluginId) {
return JobKey.jobKey(pluginId.toString(), JOB_PERIODIC);
}

private InetAddress getRemoteAddress(RoutingContext ctx) {
InetAddress addr = null;
if (ctx.request() != null && ctx.request().remoteAddress() != null) {
Expand Down
Loading
Loading