diff --git a/src/main/java/io/cryostat/ConfigProperties.java b/src/main/java/io/cryostat/ConfigProperties.java index 163ac15c0..8c3a59b47 100644 --- a/src/main/java/io/cryostat/ConfigProperties.java +++ b/src/main/java/io/cryostat/ConfigProperties.java @@ -50,6 +50,12 @@ public class ConfigProperties { public static final String CONTAINERS_POLL_PERIOD = "cryostat.discovery.containers.poll-period"; public static final String CONTAINERS_REQUEST_TIMEOUT = "cryostat.discovery.containers.request-timeout"; + public static final String DISCOVERY_PLUGINS_MAX_FAILURES = + "cryostat.discovery.plugins.max-failures"; + public static final String DISCOVERY_PLUGINS_PING_PERIOD = + "cryostat.discovery.plugins.ping-period"; + public static final String DISCOVERY_PLUGINS_MAX_BACKOFF_MULTIPLIER = + "cryostat.discovery.plugins.max-backoff-multiplier"; public static final String CONNECTIONS_TTL = "cryostat.connections.ttl"; public static final String CONNECTIONS_FAILED_BACKOFF = "cryostat.connections.failed-backoff"; diff --git a/src/main/java/io/cryostat/discovery/CustomDiscovery.java b/src/main/java/io/cryostat/discovery/CustomDiscovery.java index 0464af1c7..a99d5d965 100644 --- a/src/main/java/io/cryostat/discovery/CustomDiscovery.java +++ b/src/main/java/io/cryostat/discovery/CustomDiscovery.java @@ -39,6 +39,8 @@ import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.LockModeType; import jakarta.transaction.Transactional; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.Consumes; @@ -78,6 +80,7 @@ public class CustomDiscovery { @Inject Logger logger; @Inject EventBus bus; + @Inject EntityManager entityManager; @Inject TargetConnectionManager connectionManager; @Inject URIUtil uriUtil; @@ -200,11 +203,16 @@ RestResponse doCreate( DiscoveryNode.target(target, NodeType.BaseNodeType.JVM); target.discoveryNode = node; DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + realm = + entityManager.find( + DiscoveryNode.class, + realm.id, + LockModeType.PESSIMISTIC_WRITE); - realm.children.add(node); node.parent = realm; target.persist(); node.persist(); + realm.children.add(node); realm.persist(); return ResponseBuilder.created( @@ -239,6 +247,7 @@ RestResponse doCreate( public void delete(@RestPath long id) throws URISyntaxException { Target target = Target.find("id", id).singleResult(); DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + realm = entityManager.find(DiscoveryNode.class, realm.id, LockModeType.PESSIMISTIC_WRITE); boolean withinRealm = realm.children.remove(target.discoveryNode); if (!withinRealm) { throw new BadRequestException(); diff --git a/src/main/java/io/cryostat/discovery/Discovery.java b/src/main/java/io/cryostat/discovery/Discovery.java index a9e23a409..adda8ca3f 100644 --- a/src/main/java/io/cryostat/discovery/Discovery.java +++ b/src/main/java/io/cryostat/discovery/Discovery.java @@ -39,7 +39,6 @@ import io.cryostat.ConfigProperties; import io.cryostat.credentials.Credential; -import io.cryostat.discovery.DiscoveryPlugin.PluginCallback; import io.cryostat.discovery.KubeEndpointSlicesDiscovery.KubeDiscoveryNodeType; import io.cryostat.discovery.NodeType.BaseNodeType; import io.cryostat.targets.TargetConnectionManager; @@ -61,7 +60,10 @@ import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.LockModeType; import jakarta.persistence.NoResultException; +import jakarta.persistence.OptimisticLockException; import jakarta.transaction.Transactional; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.Consumes; @@ -115,9 +117,12 @@ public class Discovery { private static final String PLUGIN_ID_MAP_KEY = "pluginId"; private static final String REFRESH_MAP_KEY = "refresh"; - @ConfigProperty(name = "cryostat.discovery.plugins.ping-period") + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_PING_PERIOD) Duration discoveryPingPeriod; + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_MAX_FAILURES) + int maxConsecutiveFailures; + @ConfigProperty(name = ConfigProperties.AGENT_TLS_REQUIRED) boolean agentTlsRequired; @@ -130,6 +135,8 @@ public class Discovery { @Inject Scheduler scheduler; @Inject URIUtil uriUtil; @Inject KubeEndpointSlicesDiscovery k8sDiscovery; + @Inject PluginCallbackFactory callbackFactory; + @Inject EntityManager entityManager; void onStart(@Observes StartupEvent evt) { QuarkusTransaction.requiringNew() @@ -347,36 +354,82 @@ public PluginRegistration register(@Context RoutingContext ctx, JsonObject body) throw new InternalServerErrorException(e); } } else { - // check if a plugin record with the same callback already exists. If it does, - // ping it: + // check if a plugin record with the same callback and realm-name already exists. If it + // does, ping it: // - if it's still there reject this request as a duplicate // - otherwise delete the previous record and accept this new one as a replacement - QuarkusTransaction.joiningExisting() - .call(() -> DiscoveryPlugin.find("callback", unauthCallback)) - .singleResultOptional() - .ifPresent( - p -> { - try { - var cb = PluginCallback.create(p); - cb.ping(); - throw new DuplicatePluginException( - String.format( - "Plugin with callback %s already exists and is" - + " still reachable", - unauthCallback)); - } catch (Exception e) { - if (!(e instanceof DuplicatePluginException)) { - logger.debug(e); - QuarkusTransaction.joiningExisting().run(p::delete); - } - } - }); - - // new plugin registration + boolean isNewPlugin = false; plugin = QuarkusTransaction.joiningExisting() .call( () -> { + Optional existing = + DiscoveryPlugin.findByCallbackAndRealmName( + unauthCallback, realmName); + + if (existing.isPresent()) { + logger.debugv( + "Reusing existing plugin: {0}", + existing.get().id); + return existing.get(); + } + + // Check for plugin with same callback, different realm + Optional byCallback = + DiscoveryPlugin.find( + "callback", unauthCallback) + .singleResultOptional(); + + if (byCallback.isPresent()) { + DiscoveryPlugin p = byCallback.get(); + try { + var cb = callbackFactory.create(p); + cb.ping(); + // Plugin is reachable but has different realm + throw new DuplicatePluginException( + String.format( + "Plugin with callback %s already" + + " exists and is still" + + " reachable", + unauthCallback)); + } catch (Exception e) { + if (!(e instanceof DuplicatePluginException)) { + // Plugin unreachable, delete and create new + logger.debug(e); + UUID oldPluginId = p.id; + try { + var toDelete = + DiscoveryPlugin + .findById( + oldPluginId); + if (toDelete != null) { + toDelete.delete(); + logger.debugv( + "Deleted unreachable plugin:" + + " {0}", + oldPluginId); + } else { + logger.debugv( + "Plugin already deleted" + + " (concurrent cleanup):" + + " {0}", + oldPluginId); + } + } catch (Exception deleteEx) { + logger.debugv( + deleteEx, + "Failed to delete unreachable" + + " plugin (may already be" + + " deleted): {0}", + oldPluginId); + } + } else { + throw e; + } + } + } + + // Create new plugin DiscoveryPlugin p = new DiscoveryPlugin(); p.callback = callbackUri; p.realm = @@ -390,6 +443,8 @@ public PluginRegistration register(@Context RoutingContext ctx, JsonObject body) p.persist(); universe.children.add(p.realm); universe.persist(); + + logger.debugv("Created new plugin: {0}", p.id); return p; }); @@ -399,26 +454,29 @@ public PluginRegistration register(@Context RoutingContext ctx, JsonObject body) throw new BadRequestException(e); } - var dataMap = new JobDataMap(); - dataMap.put(PLUGIN_ID_MAP_KEY, plugin.id); - dataMap.put(REFRESH_MAP_KEY, true); - JobDetail jobDetail = - JobBuilder.newJob(RefreshPluginJob.class) - .withIdentity(plugin.id.toString(), JOB_PERIODIC) - .usingJobData(dataMap) - .build(); - var trigger = - TriggerBuilder.newTrigger() - .withIdentity( - jobDetail.getKey().getName(), jobDetail.getKey().getGroup()) - .startAt(Date.from(Instant.now().plus(discoveryPingPeriod))) - .withSchedule( - SimpleScheduleBuilder.simpleSchedule() - .repeatForever() - .withIntervalInSeconds( - (int) discoveryPingPeriod.toSeconds())) - .build(); - scheduler.scheduleJob(jobDetail, trigger); + isNewPlugin = !scheduler.checkExists(JobKey.jobKey(plugin.id.toString(), JOB_PERIODIC)); + if (isNewPlugin) { + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, plugin.id); + dataMap.put(REFRESH_MAP_KEY, true); + JobDetail jobDetail = + JobBuilder.newJob(RefreshPluginJob.class) + .withIdentity(plugin.id.toString(), JOB_PERIODIC) + .usingJobData(dataMap) + .build(); + var trigger = + TriggerBuilder.newTrigger() + .withIdentity( + jobDetail.getKey().getName(), jobDetail.getKey().getGroup()) + .startAt(Date.from(Instant.now().plus(discoveryPingPeriod))) + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .repeatForever() + .withIntervalInSeconds( + (int) discoveryPingPeriod.toSeconds())) + .build(); + scheduler.scheduleJob(jobDetail, trigger); + } } String token; @@ -489,6 +547,9 @@ public void publishWithContext( @RestHeader("Cryostat-Discovery-Authentication") String token, DiscoveryPublication body) { DiscoveryPlugin plugin = DiscoveryPlugin.find("id", id).singleResult(); + DiscoveryNode realm = + entityManager.find( + DiscoveryNode.class, plugin.realm.id, LockModeType.PESSIMISTIC_WRITE); try { jwtValidator.validateJwt(ctx, plugin, token, true); } catch (MalformedURLException @@ -504,12 +565,12 @@ public void publishWithContext( validatePublishedNode(n); } - plugin.realm.children.clear(); + List replacementChildren = new ArrayList<>(); for (var n : body.nodes) { n.target.discoveryNode = n; } - body.fillStrategy.ifPresent( + body.fillStrategy.ifPresentOrElse( algo -> { Map pubCtx = body.context.orElse(Map.of()); switch (algo) { @@ -548,7 +609,6 @@ public void publishWithContext( nsNode.labels = new HashMap<>(); nsNode.children = new ArrayList<>(); nsNode.target = null; - nsNode.parent = plugin.realm; nsNode.children.add(lineage); lineage.parent = nsNode; @@ -563,19 +623,26 @@ public void publishWithContext( } nsNode.persist(); - plugin.realm.children.add(nsNode); + replacementChildren.add(nsNode); break; default: - plugin.realm.children.addAll(body.nodes); + replacementChildren.addAll(body.nodes); for (var n : body.nodes) { - n.parent = plugin.realm; + n.parent = realm; n.persist(); } break; } + }, + () -> { + replacementChildren.addAll(body.nodes); + for (var n : body.nodes) { + n.parent = realm; + n.persist(); + } }); - plugin.realm.persist(); + replaceChildren(realm, replacementChildren); plugin.persist(); } @@ -688,6 +755,20 @@ private void validatePublishedNode(DiscoveryNode currentNode) { } } + private void replaceChildren(DiscoveryNode parent, List replacementChildren) { + List existingChildren = new ArrayList<>(parent.children); + parent.children.clear(); + existingChildren.forEach(child -> child.parent = null); + replacementChildren.forEach(child -> child.parent = parent); + parent.children.addAll(replacementChildren); + try { + parent.persist(); + entityManager.flush(); + } catch (OptimisticLockException e) { + throw new BadRequestException("Discovery tree update conflict", e); + } + } + @SuppressFBWarnings("DLS_DEAD_LOCAL_STORE") private DiscoveryNode mergeRealms() { DiscoveryNode universe = DiscoveryNode.getUniverse(); @@ -811,31 +892,79 @@ private static record NodeContext( * pings plugins to ensure they are still alive/reachable and to prompt them to request a fresh * token if their token will be expiring soon. */ - @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") @DisallowConcurrentExecution static class RefreshPluginJob implements Job { @Inject Logger logger; + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_MAX_FAILURES) + int maxConsecutiveFailures; + + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_PING_PERIOD) + Duration basePingPeriod; + + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_MAX_BACKOFF_MULTIPLIER) + int maxBackoffMultiplier; + + @Inject PluginCallbackFactory callbackFactory; + + @Inject EntityManager entityManager; + @Override - @Transactional public void execute(JobExecutionContext context) throws JobExecutionException { - DiscoveryPlugin plugin = null; + boolean refresh = context.getMergedJobDataMap().getBoolean(REFRESH_MAP_KEY); + UUID pluginId = (UUID) context.getMergedJobDataMap().get(PLUGIN_ID_MAP_KEY); + try { - boolean refresh = context.getMergedJobDataMap().getBoolean(REFRESH_MAP_KEY); - plugin = - DiscoveryPlugin.find( - "id", context.getMergedJobDataMap().get(PLUGIN_ID_MAP_KEY)) - .singleResult(); - var cb = PluginCallback.create(plugin); - if (refresh) { - cb.refresh(); - logger.debugv( - "Refreshed discovery plugin: {0} @ {1}", plugin.realm, plugin.callback); - } else { - cb.ping(); - logger.debugv( - "Retained discovery plugin: {0} @ {1}", plugin.realm, plugin.callback); - } + QuarkusTransaction.requiringNew() + .run( + () -> { + try { + var p = DiscoveryPlugin.findById(pluginId); + + if (p == null) { + throw new NoResultException( + "Plugin not found: " + pluginId); + } + + if (p.nextPingAt != null + && Instant.now().isBefore(p.nextPingAt)) { + logger.debugv( + "Skipping ping due to backoff: {0} @ {1}", + p.realm.name, p.callback); + return; + } + + var cb = callbackFactory.create(p); + if (refresh) { + cb.refresh(); + logger.debugv( + "Refreshed discovery plugin: {0} @ {1}", + p.realm.name, p.callback); + } else { + cb.ping(); + logger.debugv( + "Retained discovery plugin: {0} @ {1}", + p.realm.name, p.callback); + } + + p.consecutiveFailures = 0; + p.lastSuccessfulPing = Instant.now(); + p.backoffMultiplier = 1; + p.nextPingAt = null; + p.persist(); + + logger.debugv( + "Plugin ping successful - lastSuccessfulPing: {0}," + + " consecutiveFailures reset to 0," + + " backoffMultiplier reset to 1: {1} @" + + " {2}", + p.lastSuccessfulPing, p.realm.name, p.callback); + } catch (NoResultException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } catch (NoResultException e) { logger.debugv( e, @@ -845,10 +974,69 @@ public void execute(JobExecutionContext context) throws JobExecutionException { ex.setUnscheduleFiringTrigger(true); throw ex; } catch (Exception e) { - if (plugin != null) { - logger.debugv( - e, "Pruned discovery plugin: {0} @ {1}", plugin.realm, plugin.callback); - plugin.delete(); + // Unwrap RuntimeException wrappers to get the actual cause + Throwable cause = e; + while (cause instanceof RuntimeException && cause.getCause() != null) { + cause = cause.getCause(); + } + + if (pluginId != null) { + final Throwable finalCause = cause; + QuarkusTransaction.requiringNew() + .run( + () -> { + var p = DiscoveryPlugin.findById(pluginId); + if (p != null) { + p.consecutiveFailures++; + p.lastFailedPing = Instant.now(); + p.backoffMultiplier = + Math.min( + p.backoffMultiplier * 2, + maxBackoffMultiplier); + Duration backoffPeriod = + basePingPeriod.multipliedBy( + p.backoffMultiplier); + p.nextPingAt = Instant.now().plus(backoffPeriod); + p.persist(); + + logger.debugv( + "Plugin ping failed - lastFailedPing: {0}," + + " consecutiveFailures: {1}/{2}," + + " backoffMultiplier: {3}, nextPingAt:" + + " {4}: {5} @ {6}", + p.lastFailedPing, + p.consecutiveFailures, + maxConsecutiveFailures, + p.backoffMultiplier, + p.nextPingAt, + p.realm.name, + p.callback); + + if (p.consecutiveFailures >= maxConsecutiveFailures) { + logger.warnv( + "Pruning discovery plugin after {0}" + + " consecutive failures: {1} @" + + " {2}", + p.consecutiveFailures, + p.realm.name, + p.callback); + p.delete(); + } else { + logger.warnv( + finalCause, + "Plugin ping failed ({0}/{1}), backing off" + + " for {2}: {3} @ {4}", + p.consecutiveFailures, + maxConsecutiveFailures, + backoffPeriod, + p.realm.name, + p.callback); + } + } + }); + + var ex = new JobExecutionException(e); + throw ex; } else { var ex = new JobExecutionException(e); ex.setUnscheduleFiringTrigger(true); diff --git a/src/main/java/io/cryostat/discovery/DiscoveryJwtFactory.java b/src/main/java/io/cryostat/discovery/DiscoveryJwtFactory.java index 05cea1afd..de4edf60f 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryJwtFactory.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryJwtFactory.java @@ -35,6 +35,8 @@ import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; +import io.cryostat.ConfigProperties; + import com.nimbusds.jose.EncryptionMethod; import com.nimbusds.jose.JOSEException; import com.nimbusds.jose.JWEAlgorithm; @@ -71,7 +73,7 @@ public class DiscoveryJwtFactory { static final String DISCOVERY_V4_API_PATH = "/api/v4/discovery/"; static final String DISCOVERY_V4_2_API_PATH = "/api/v4.2/discovery/"; - @ConfigProperty(name = "cryostat.discovery.plugins.ping-period") + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_PING_PERIOD) Duration discoveryPingPeriod; @ConfigProperty(name = "cryostat.discovery.plugins.jwt.signature.algorithm") diff --git a/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java b/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java index 2a0df0ae7..bc2d67858 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java @@ -17,6 +17,8 @@ import java.net.URI; import java.net.URISyntaxException; +import java.time.Instant; +import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; @@ -74,7 +76,12 @@ @NamedQueries({ @NamedQuery( name = "DiscoveryPlugin.getBuiltinRealmIds", - query = "SELECT p.realm.id FROM DiscoveryPlugin p WHERE p.builtin = true") + query = "SELECT p.realm.id FROM DiscoveryPlugin p WHERE p.builtin = true"), + @NamedQuery( + name = "DiscoveryPlugin.findByCallbackAndRealmName", + query = + "SELECT p FROM DiscoveryPlugin p JOIN FETCH p.realm r WHERE p.callback = ?1" + + " AND r.name = ?2") }) public class DiscoveryPlugin extends PanacheEntityBase { @@ -109,10 +116,41 @@ public class DiscoveryPlugin extends PanacheEntityBase { @JsonProperty(access = JsonProperty.Access.READ_ONLY) public boolean builtin; + @Column(nullable = false) + @JsonIgnore + public int consecutiveFailures = 0; + + @Column(nullable = true) + @Convert(converter = InstantConverter.class) + @JsonIgnore + public Instant lastSuccessfulPing; + + @Column(nullable = true) + @Convert(converter = InstantConverter.class) + @JsonIgnore + public Instant lastFailedPing; + + @Column(nullable = false) + @JsonIgnore + public int backoffMultiplier = 1; + + @Column(nullable = true) + @Convert(converter = InstantConverter.class) + @JsonIgnore + public Instant nextPingAt; + + public static Optional findByCallbackAndRealmName( + URI callback, String realmName) { + return DiscoveryPlugin.find( + "#DiscoveryPlugin.findByCallbackAndRealmName", callback, realmName) + .singleResultOptional(); + } + @ApplicationScoped static class Listener { @Inject Logger logger; + @Inject PluginCallbackFactory callbackFactory; @PrePersist @Transactional @@ -126,13 +164,22 @@ public void prePersist(DiscoveryPlugin plugin) { if (plugin.credential == null) { var credential = getCredential(plugin); plugin.credential = credential; + credential.discoveryPlugin = plugin; plugin.callback = UriBuilder.fromUri(plugin.callback).userInfo(null).build(); } + if (plugin.nextPingAt != null + || plugin.lastFailedPing != null + || plugin.lastSuccessfulPing != null) { + logger.debugv( + "Skipping prePersist ping for plugin with existing state: {0} @ {1}", + plugin.realm.name, plugin.callback); + return; + } try { logger.debugv( "Testing discovery plugin callback: {0} @ {1}", plugin.realm.name, plugin.callback); - PluginCallback.create(plugin).ping(); + callbackFactory.create(plugin).ping(); logger.debugv( "Registered discovery plugin: {0} @ {1}", plugin.realm.name, plugin.callback); diff --git a/src/main/java/io/cryostat/discovery/InstantConverter.java b/src/main/java/io/cryostat/discovery/InstantConverter.java new file mode 100644 index 000000000..2a017abba --- /dev/null +++ b/src/main/java/io/cryostat/discovery/InstantConverter.java @@ -0,0 +1,39 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +import java.time.Instant; + +import jakarta.persistence.AttributeConverter; + +public class InstantConverter implements AttributeConverter { + + @Override + public Long convertToDatabaseColumn(Instant attribute) { + if (attribute == null) { + return null; + } + return attribute.toEpochMilli(); + } + + @Override + public Instant convertToEntityAttribute(Long dbData) { + if (dbData == null) { + return null; + } + return Instant.ofEpochMilli(dbData); + } +} diff --git a/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java b/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java index ebd3ddef9..39bd202be 100644 --- a/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java @@ -60,6 +60,8 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.LockModeType; import jakarta.transaction.Transactional; import jakarta.transaction.Transactional.TxType; import org.apache.commons.lang3.StringUtils; @@ -115,6 +117,8 @@ public class KubeEndpointSlicesDiscovery implements ResourceEventHandler n.name.equals(namespace)) - .orElse( - DiscoveryNode.environment( - namespace, KubeDiscoveryNodeType.NAMESPACE)); + DiscoveryNode.getChild(lockedRealm, n -> n.name.equals(namespace)) + .orElseGet( + () -> { + DiscoveryNode created = + DiscoveryNode.environment( + namespace, KubeDiscoveryNodeType.NAMESPACE); + created.parent = lockedRealm; + return created; + }); if (evt.eventKind == EventKind.FOUND) { persistOwnerChain(nsNode, evt.target, evt.objRef); @@ -402,6 +413,7 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { realm.children.add(nsNode); nsNode.parent = realm; } + entityManager.flush(); realm.persist(); } @@ -718,6 +730,7 @@ private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { child = parent; } + entityManager.flush(); nsNode.persist(); target.delete(); } @@ -807,6 +820,7 @@ private void persistNodeChain(DiscoveryNode nsNode, Target target, DiscoveryNode } // Finally persist the namespace node + entityManager.flush(); nsNode.persist(); } diff --git a/src/main/java/io/cryostat/discovery/PluginCallbackFactory.java b/src/main/java/io/cryostat/discovery/PluginCallbackFactory.java new file mode 100644 index 000000000..f49311e1d --- /dev/null +++ b/src/main/java/io/cryostat/discovery/PluginCallbackFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +import java.net.URISyntaxException; + +import io.cryostat.discovery.DiscoveryPlugin.PluginCallback; + +import jakarta.enterprise.context.ApplicationScoped; + +/** + * Factory for creating PluginCallback instances. This abstraction allows tests to inject mock + * callbacks without making real network connections. + */ +@ApplicationScoped +public class PluginCallbackFactory { + + public PluginCallback create(DiscoveryPlugin plugin) throws URISyntaxException { + return PluginCallback.create(plugin); + } +} diff --git a/src/main/java/io/cryostat/monitoring/ConnectionPoolMonitor.java b/src/main/java/io/cryostat/monitoring/ConnectionPoolMonitor.java new file mode 100644 index 000000000..8729370f2 --- /dev/null +++ b/src/main/java/io/cryostat/monitoring/ConnectionPoolMonitor.java @@ -0,0 +1,98 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.monitoring; + +import io.agroal.api.AgroalDataSource; +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.jboss.logging.Logger; + +/** + * Monitors database connection pool health and logs metrics periodically. + * + *

This monitor tracks active, available, waiting, max used, and leak detection counts to help + * identify connection pool exhaustion and potential leaks. Part of Risk 10 Phase 2 mitigation. + */ +@ApplicationScoped +public class ConnectionPoolMonitor { + + @Inject AgroalDataSource dataSource; + @Inject Logger log; + + /** + * Logs connection pool statistics every 30 seconds. + * + *

Logs at debug level for normal operation. Logs at warn level with full metrics if + * connections are waiting (possible exhaustion) or leaks are detected. + */ + @Scheduled(every = "30s") + void logPoolStats() { + try { + var metrics = dataSource.getMetrics(); + + long active = metrics.activeCount(); + long available = metrics.availableCount(); + long waiting = metrics.awaitingCount(); + long maxUsed = metrics.maxUsedCount(); + long leakDetection = metrics.leakDetectionCount(); + + boolean hasIssues = waiting > 0 || leakDetection > 0; + + if (hasIssues) { + log.warnf( + "Connection pool issue detected: active=%d, available=%d, waiting=%d," + + " maxUsed=%d, leakDetection=%d", + active, available, waiting, maxUsed, leakDetection); + } else { + log.debugf( + "Connection pool: active=%d, available=%d, waiting=%d, maxUsed=%d," + + " leakDetection=%d", + active, available, waiting, maxUsed, leakDetection); + } + } catch (Exception e) { + log.errorf(e, "Failed to retrieve connection pool metrics"); + } + } + + /** + * Gets the current number of active connections. + * + * @return the number of active connections + */ + public long getActiveCount() { + try { + return dataSource.getMetrics().activeCount(); + } catch (Exception e) { + log.errorf(e, "Failed to retrieve active connection count"); + return -1; + } + } + + /** + * Gets the current number of connections waiting for availability. + * + * @return the number of waiting connections + */ + public long getAwaitingCount() { + try { + return dataSource.getMetrics().awaitingCount(); + } catch (Exception e) { + log.errorf(e, "Failed to retrieve awaiting connection count"); + return -1; + } + } +} diff --git a/src/main/java/io/cryostat/targets/ActiveRecordingUpdateJob.java b/src/main/java/io/cryostat/targets/ActiveRecordingUpdateJob.java index da9e4ee8f..9a04980f1 100644 --- a/src/main/java/io/cryostat/targets/ActiveRecordingUpdateJob.java +++ b/src/main/java/io/cryostat/targets/ActiveRecordingUpdateJob.java @@ -19,8 +19,10 @@ import io.cryostat.recordings.RecordingHelper; import jakarta.inject.Inject; +import jakarta.persistence.NoResultException; import jakarta.persistence.PersistenceException; import jakarta.transaction.Transactional; +import org.hibernate.ObjectDeletedException; import org.jboss.logging.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; @@ -49,10 +51,17 @@ public void execute(JobExecutionContext context) throws JobExecutionException { Target target; try { target = Target.getTargetById(recording.target.id); - } catch (PersistenceException e) { - // the target was lost in the meantime, so we can stop worrying about this update + } catch (NoResultException | ObjectDeletedException e) { + // target disappeared in the meantime. No big deal. logger.debug(e); - return; + JobExecutionException ex = new JobExecutionException(e); + ex.setRefireImmediately(false); + ex.setUnscheduleFiringTrigger(true); + throw ex; + } catch (PersistenceException e) { + JobExecutionException ex = new JobExecutionException(e); + ex.setRefireImmediately(false); + throw ex; } // FIXME hacky. This opens a remote connection on each call and updates our database with // the data we find there. We should have some remote connection callback (JMX listener, diff --git a/src/main/java/io/cryostat/targets/AgentClient.java b/src/main/java/io/cryostat/targets/AgentClient.java index b1ac86622..a6daaee61 100644 --- a/src/main/java/io/cryostat/targets/AgentClient.java +++ b/src/main/java/io/cryostat/targets/AgentClient.java @@ -117,7 +117,7 @@ Duration getTimeout() { return httpTimeout; } - Uni ping() { + public Uni ping() { return agentRestClient .ping() .invoke(Response::close) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 257895cae..320a6c9cc 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -3,6 +3,16 @@ quarkus.flyway.baseline-at-start=true quarkus.flyway.baseline-on-migrate=true quarkus.flyway.validate-migration-naming=true +quarkus.datasource.metrics.enabled=true +quarkus.datasource.jdbc.min-size=10 +quarkus.datasource.jdbc.max-size=50 +quarkus.datasource.jdbc.acquisition-timeout=PT10S +quarkus.datasource.jdbc.idle-removal-interval=PT5M +quarkus.datasource.jdbc.max-lifetime=PT30M +quarkus.datasource.jdbc.leak-detection-interval=PT2M + +quarkus.transaction-manager.default-transaction-timeout=PT30S + quarkus.hibernate-envers.audit-strategy=org.hibernate.envers.strategy.ValidityAuditStrategy quarkus.quartz.store-type=jdbc-cmt @@ -22,7 +32,9 @@ cryostat.discovery.containers.poll-period=10s cryostat.discovery.containers.request-timeout=2s cryostat.discovery.podman.enabled=false cryostat.discovery.docker.enabled=false -cryostat.discovery.plugins.ping-period=5m +cryostat.discovery.plugins.ping-period=1m +cryostat.discovery.plugins.max-failures=3 +cryostat.discovery.plugins.max-backoff-multiplier=5 cryostat.discovery.plugins.jwt.secret.algorithm=AES cryostat.discovery.plugins.jwt.secret.keysize=256 cryostat.discovery.plugins.jwt.signature.algorithm=HS256 diff --git a/src/main/resources/db/migration/V4.2.0__cryostat.sql b/src/main/resources/db/migration/V4.2.0__cryostat.sql index 42abb96aa..2d182380e 100644 --- a/src/main/resources/db/migration/V4.2.0__cryostat.sql +++ b/src/main/resources/db/migration/V4.2.0__cryostat.sql @@ -299,6 +299,11 @@ CREATE TABLE DiscoveryPlugin_AUD ( callback TEXT, credential_id BIGINT, builtin BOOLEAN, + consecutiveFailures INTEGER, + lastSuccessfulPing BIGINT, + lastFailedPing BIGINT, + backoffMultiplier INTEGER, + nextPingAt BIGINT, PRIMARY KEY (id, REV), FOREIGN KEY (REV) REFERENCES REVINFO (REV), FOREIGN KEY (REVEND) REFERENCES REVINFO (REV) @@ -639,4 +644,13 @@ CREATE INDEX IDX_ASYNCPROFILERRECORDING_AUD_REV ON AsyncProfilerRecording_AUD (R CREATE INDEX IDX_ASYNCPROFILERRECORDING_AUD_REVTYPE ON AsyncProfilerRecording_AUD (REVTYPE); CREATE INDEX IDX_ASYNCPROFILERRECORDING_AUD_REVEND ON AsyncProfilerRecording_AUD (REVEND); +ALTER TABLE DiscoveryPlugin ADD COLUMN consecutiveFailures INTEGER NOT NULL DEFAULT 0; +ALTER TABLE DiscoveryPlugin ADD COLUMN lastSuccessfulPing BIGINT; +ALTER TABLE DiscoveryPlugin ADD COLUMN lastFailedPing BIGINT; +ALTER TABLE DiscoveryPlugin ADD COLUMN backoffMultiplier INTEGER NOT NULL DEFAULT 1; +ALTER TABLE DiscoveryPlugin ADD COLUMN nextPingAt BIGINT; + +-- Add unique constraint on DiscoveryNode name for Realm nodes to enforce realm name uniqueness +CREATE UNIQUE INDEX uk_discovery_node_realm_name ON DiscoveryNode (name) WHERE (nodeType = 'Realm'); + COMMIT; diff --git a/src/test/java/io/cryostat/discovery/DiscoveryPluginGracePeriodTest.java b/src/test/java/io/cryostat/discovery/DiscoveryPluginGracePeriodTest.java new file mode 100644 index 000000000..ecb95a302 --- /dev/null +++ b/src/test/java/io/cryostat/discovery/DiscoveryPluginGracePeriodTest.java @@ -0,0 +1,699 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +import static io.restassured.RestAssured.given; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.net.URI; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +import io.cryostat.AbstractTransactionalTestBase; +import io.cryostat.ConfigProperties; +import io.cryostat.discovery.DiscoveryPlugin.PluginCallback; +import io.cryostat.targets.AgentClient; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.test.InjectMock; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import jakarta.inject.Inject; +import jakarta.ws.rs.ProcessingException; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; + +@QuarkusTest +public class DiscoveryPluginGracePeriodTest extends AbstractTransactionalTestBase { + + @ConfigProperty(name = ConfigProperties.DISCOVERY_PLUGINS_MAX_FAILURES) + int maxConsecutiveFailures; + + @Inject Discovery.RefreshPluginJob refreshPluginJob; + @Inject AgentClient.Factory agentClientFactory; + + @InjectMock PluginCallbackFactory callbackFactory; + + private static final String PLUGIN_ID_MAP_KEY = "pluginId"; + private static final String REFRESH_MAP_KEY = "refresh"; + + private PluginCallback mockCallback; + + @BeforeEach + public void setupMocks() throws Exception { + mockCallback = mock(PluginCallback.class); + Mockito.reset(callbackFactory); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + } + + private UUID createPluginInCommittedTransaction( + long credentialId, String realmName, int consecutiveFailures) { + return QuarkusTransaction.requiringNew() + .call( + () -> { + var realm = new DiscoveryNode(); + realm.name = realmName; + realm.nodeType = NodeType.BaseNodeType.REALM.getKind(); + realm.persist(); + + var plugin = new DiscoveryPlugin(); + plugin.realm = realm; + plugin.callback = + URI.create( + String.format( + "http://storedcredentials:%d@localhost:9999/nonexistent", + credentialId)); + plugin.builtin = false; + plugin.consecutiveFailures = consecutiveFailures; + plugin.persist(); + + return plugin.id; + }); + } + + @Test + public void testConsecutiveFailuresIncrement() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction so the job can see it + UUID pluginId = createPluginInCommittedTransaction(credentialId, "test_failure_realm", 0); + + var plugin = DiscoveryPlugin.findById(pluginId); + assertNotNull(plugin); + assertEquals(0, plugin.consecutiveFailures); + assertNull(plugin.lastSuccessfulPing); + + doThrow(new ProcessingException("Connection refused")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + // Simulate ping failures + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // First failure + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var updatedPlugin1 = + QuarkusTransaction.requiringNew() + .call(() -> DiscoveryPlugin.findById(pluginId)); + assertEquals(1, updatedPlugin1.consecutiveFailures); + assertNull(updatedPlugin1.lastSuccessfulPing); + + // Clear backoff to allow second ping attempt + QuarkusTransaction.requiringNew() + .run( + () -> { + var p = DiscoveryPlugin.findById(pluginId); + if (p != null) { + p.nextPingAt = null; + p.persist(); + } + }); + + // Second failure + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var updatedPlugin2 = + QuarkusTransaction.requiringNew() + .call(() -> DiscoveryPlugin.findById(pluginId)); + assertEquals(2, updatedPlugin2.consecutiveFailures); + assertNull(updatedPlugin2.lastSuccessfulPing); + } + + @Test + public void testPluginDeletedAfterMaxFailures() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction + UUID pluginId = + createPluginInCommittedTransaction( + credentialId, "test_max_failures_realm", maxConsecutiveFailures - 1); + + // Mock the callback to throw an exception simulating network failure + doThrow(new ProcessingException("Connection refused")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // This failure should trigger deletion + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var deletedPlugin = DiscoveryPlugin.findById(pluginId); + assertNull(deletedPlugin, "Plugin should be deleted after max consecutive failures"); + } + + @Test + public void testConsecutiveFailuresResetOnSuccess() throws Exception { + // For this test, use real callback creation since we're testing against real endpoint + doCallRealMethod().when(callbackFactory).create(any(DiscoveryPlugin.class)); + + // Create a plugin with a valid callback + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl ==" + + " 'http://localhost:8081/health/liveness'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + var callback = + String.format( + "http://storedcredentials:%d@localhost:8081/health/liveness", credentialId); + + var registration = + given().log() + .all() + .when() + .body(Map.of("realm", "test_reset_realm", "callback", callback)) + .contentType(ContentType.JSON) + .post("/api/v4/discovery") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .jsonPath(); + + var pluginId = UUID.fromString(registration.getString("id")); + + // Manually set some consecutive failures + QuarkusTransaction.requiringNew() + .run( + () -> { + var plugin = + DiscoveryPlugin.find("id", pluginId) + .firstResult(); + plugin.consecutiveFailures = 2; + plugin.persist(); + }); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // Execute successful ping + refreshPluginJob.execute(context); + + DiscoveryPlugin updatedPlugin = + QuarkusTransaction.requiringNew() + .call( + () -> + DiscoveryPlugin.find("id", pluginId) + .firstResult()); + + assertEquals(0, updatedPlugin.consecutiveFailures, "Consecutive failures should be reset"); + assertNotNull(updatedPlugin.lastSuccessfulPing, "Last successful ping should be set"); + assertTrue( + updatedPlugin.lastSuccessfulPing.isBefore(Instant.now().plusSeconds(1)), + "Last successful ping should be recent"); + } + + @Test + public void testPluginNotDeletedBeforeMaxFailures() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction + UUID pluginId = + createPluginInCommittedTransaction(credentialId, "test_not_deleted_realm", 0); + + // Mock the callback to throw an exception simulating network failure + doThrow(new ProcessingException("Connection refused")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // Execute failures up to max-1 + for (int i = 0; i < maxConsecutiveFailures - 1; i++) { + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + // Clear backoff to allow next ping attempt + if (i < maxConsecutiveFailures - 2) { + QuarkusTransaction.requiringNew() + .run( + () -> { + var p = DiscoveryPlugin.findById(pluginId); + if (p != null) { + p.nextPingAt = null; + p.persist(); + } + }); + } + } + + var updatedPlugin = DiscoveryPlugin.findById(pluginId); + + assertNotNull( + updatedPlugin, "Plugin should not be deleted before max consecutive failures"); + assertEquals( + maxConsecutiveFailures - 1, + updatedPlugin.consecutiveFailures, + "Consecutive failures should be tracked"); + } + + @Test + public void testLastFailedPingTracked() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction + UUID pluginId = + createPluginInCommittedTransaction(credentialId, "test_failure_tracking_realm", 0); + + // Mock the callback to throw an exception simulating network failure + doThrow(new ProcessingException("Connection refused")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // Execute failure + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var updatedPlugin = DiscoveryPlugin.findById(pluginId); + assertNotNull(updatedPlugin.lastFailedPing, "Last failed ping should be set"); + assertTrue( + updatedPlugin.lastFailedPing.isBefore(Instant.now().plusSeconds(1)), + "Last failed ping should be recent"); + } + + @Test + public void testBackoffMultiplierIncreasesOnFailure() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction + UUID pluginId = createPluginInCommittedTransaction(credentialId, "test_backoff_realm", 0); + + var plugin = DiscoveryPlugin.findById(pluginId); + assertEquals(1, plugin.backoffMultiplier); + assertNull(plugin.nextPingAt); + + // Mock the callback to throw an exception simulating network failure + doThrow(new ProcessingException("Connection refused")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // First failure - backoff should double to 2 + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var updatedPlugin1 = + QuarkusTransaction.requiringNew() + .call(() -> DiscoveryPlugin.findById(pluginId)); + assertEquals(2, updatedPlugin1.backoffMultiplier, "Backoff multiplier should double"); + assertNotNull(updatedPlugin1.nextPingAt, "Next ping time should be set"); + + // Clear backoff to allow second ping attempt + QuarkusTransaction.requiringNew() + .run( + () -> { + var p = DiscoveryPlugin.findById(pluginId); + if (p != null) { + p.nextPingAt = null; + p.persist(); + } + }); + + // Second failure - backoff should double to 4 + try { + refreshPluginJob.execute(context); + } catch (Exception e) { + // Expected to fail + } + + var updatedPlugin2 = + QuarkusTransaction.requiringNew() + .call(() -> DiscoveryPlugin.findById(pluginId)); + assertEquals(4, updatedPlugin2.backoffMultiplier, "Backoff multiplier should double again"); + } + + @Test + public void testBackoffSkipsPingWhenNotReady() throws Exception { + // Create credentials first + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl == 'http://localhost:9999/nonexistent'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // Create a plugin in a committed transaction with backoff state + UUID pluginId = + QuarkusTransaction.requiringNew() + .call( + () -> { + var realm = new DiscoveryNode(); + realm.name = "test_backoff_skip_realm"; + realm.nodeType = NodeType.BaseNodeType.REALM.getKind(); + realm.persist(); + + var plugin = new DiscoveryPlugin(); + plugin.realm = realm; + plugin.callback = + URI.create( + String.format( + "http://storedcredentials:%d@localhost:9999/nonexistent", + credentialId)); + plugin.builtin = false; + plugin.consecutiveFailures = 1; + plugin.backoffMultiplier = 2; + plugin.nextPingAt = + Instant.now().plusSeconds(3600); // 1 hour in future + plugin.persist(); + + return plugin.id; + }); + + // Mock the callback - it should NOT be called due to backoff + doThrow(new ProcessingException("Should not be called")).when(mockCallback).ping(); + when(callbackFactory.create(any(DiscoveryPlugin.class))).thenReturn(mockCallback); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // Execute - should skip ping due to backoff + refreshPluginJob.execute(context); + + // Verify ping was never called + verify(mockCallback, never()).ping(); + + var updatedPlugin = DiscoveryPlugin.findById(pluginId); + assertEquals( + 1, + updatedPlugin.consecutiveFailures, + "Consecutive failures should not change when ping is skipped"); + } + + @Test + public void testBackoffResetsOnSuccess() throws Exception { + // For this test, use real callback creation since we're testing against real endpoint + doCallRealMethod().when(callbackFactory).create(any(DiscoveryPlugin.class)); + + // Create a plugin with a valid callback + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl ==" + + " 'http://localhost:8081/health/liveness'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + var callback = + String.format( + "http://storedcredentials:%d@localhost:8081/health/liveness", credentialId); + + var registration = + given().log() + .all() + .when() + .body(Map.of("realm", "test_backoff_reset_realm", "callback", callback)) + .contentType(ContentType.JSON) + .post("/api/v4/discovery") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .jsonPath(); + + var pluginId = UUID.fromString(registration.getString("id")); + + // Manually set backoff state + QuarkusTransaction.requiringNew() + .run( + () -> { + var plugin = + DiscoveryPlugin.find("id", pluginId) + .firstResult(); + plugin.consecutiveFailures = 2; + plugin.backoffMultiplier = 4; + plugin.nextPingAt = Instant.now().minusSeconds(1); // In the past + plugin.persist(); + }); + + var context = mock(JobExecutionContext.class); + var dataMap = new JobDataMap(); + dataMap.put(PLUGIN_ID_MAP_KEY, pluginId); + dataMap.put(REFRESH_MAP_KEY, false); + when(context.getMergedJobDataMap()).thenReturn(dataMap); + + // Execute successful ping + refreshPluginJob.execute(context); + + DiscoveryPlugin updatedPlugin = + QuarkusTransaction.requiringNew() + .call( + () -> + DiscoveryPlugin.find("id", pluginId) + .firstResult()); + + assertEquals(0, updatedPlugin.consecutiveFailures, "Consecutive failures should be reset"); + assertEquals(1, updatedPlugin.backoffMultiplier, "Backoff multiplier should be reset"); + assertNull(updatedPlugin.nextPingAt, "Next ping time should be cleared"); + assertNotNull(updatedPlugin.lastSuccessfulPing, "Last successful ping should be set"); + } +} diff --git a/src/test/java/io/cryostat/discovery/DiscoveryPluginTest.java b/src/test/java/io/cryostat/discovery/DiscoveryPluginTest.java index 6688fc774..503680f75 100644 --- a/src/test/java/io/cryostat/discovery/DiscoveryPluginTest.java +++ b/src/test/java/io/cryostat/discovery/DiscoveryPluginTest.java @@ -474,6 +474,123 @@ void testPublishHierarchicalNodeList() { // subtree } + @Test + void testIdempotentRegistration() { + // store credentials + var credentialId = + given().log() + .all() + .when() + .formParams( + Map.of( + "username", + "user", + "password", + "pass", + "matchExpression", + "target.connectUrl ==" + + " 'http://localhost:8081/health/liveness'")) + .contentType(ContentType.URLENC) + .post("/api/v4/credentials") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(201) + .contentType(ContentType.JSON) + .extract() + .jsonPath() + .getLong("id"); + + // register first time + var realmName = "idempotent_test_realm"; + var callback = + String.format( + "http://storedcredentials:%d@localhost:8081/health/liveness", credentialId); + var registration1 = + given().log() + .all() + .when() + .body(Map.of("realm", realmName, "callback", callback)) + .contentType(ContentType.JSON) + .post("/api/v4/discovery") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .jsonPath(); + var pluginId1 = registration1.getString("id"); + var pluginToken1 = registration1.getString("token"); + MatcherAssert.assertThat( + pluginId1, Matchers.is(Matchers.not(Matchers.emptyOrNullString()))); + MatcherAssert.assertThat( + pluginToken1, Matchers.is(Matchers.not(Matchers.emptyOrNullString()))); + + // register second time with same callback and realm - should be idempotent + var registration2 = + given().log() + .all() + .when() + .body(Map.of("realm", realmName, "callback", callback)) + .contentType(ContentType.JSON) + .post("/api/v4/discovery") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .jsonPath(); + var pluginId2 = registration2.getString("id"); + var pluginToken2 = registration2.getString("token"); + + // Should return the same plugin ID (idempotent) + MatcherAssert.assertThat(pluginId2, Matchers.equalTo(pluginId1)); + // Token should be different (refreshed) + MatcherAssert.assertThat(pluginToken2, Matchers.not(Matchers.equalTo(pluginToken1))); + + // register third time - verify still idempotent + var registration3 = + given().log() + .all() + .when() + .body(Map.of("realm", realmName, "callback", callback)) + .contentType(ContentType.JSON) + .post("/api/v4/discovery") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .jsonPath(); + var pluginId3 = registration3.getString("id"); + + MatcherAssert.assertThat(pluginId3, Matchers.equalTo(pluginId1)); + + // cleanup + given().log() + .all() + .when() + .header(DISCOVERY_HEADER, pluginToken2) + .delete(String.format("/api/v4/discovery/%s", pluginId1)) + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(204); + } + record Node(String name, String nodeType, Target target, List children) { Node(String name, String nodeType, Target target) { this(name, nodeType, target, null); diff --git a/src/test/java/io/cryostat/discovery/DiscoveryTreeConsistencyTest.java b/src/test/java/io/cryostat/discovery/DiscoveryTreeConsistencyTest.java new file mode 100644 index 000000000..f79092ae4 --- /dev/null +++ b/src/test/java/io/cryostat/discovery/DiscoveryTreeConsistencyTest.java @@ -0,0 +1,152 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import io.cryostat.AbstractTransactionalTestBase; +import io.cryostat.targets.Target; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.test.junit.QuarkusTest; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.LockModeType; +import org.junit.jupiter.api.Test; + +@QuarkusTest +class DiscoveryTreeConsistencyTest extends AbstractTransactionalTestBase { + + @Inject EntityManager entityManager; + + @Test + void shouldReplaceRealmChildrenAtomically() { + UUID pluginId = QuarkusTransaction.requiringNew().call(() -> createPlugin("atomic-realm")); + + QuarkusTransaction.requiringNew() + .run( + () -> { + DiscoveryPlugin plugin = + entityManager.find(DiscoveryPlugin.class, pluginId); + DiscoveryNode realm = plugin.realm; + DiscoveryNode stale = + targetNode( + "service:jmx:rmi:///jndi/rmi://127.0.0.1:9091/jmxrmi"); + realm.children.add(stale); + stale.parent = realm; + entityManager.persist(stale); + entityManager.flush(); + }); + + QuarkusTransaction.requiringNew() + .run( + () -> { + DiscoveryPlugin plugin = + entityManager.find(DiscoveryPlugin.class, pluginId); + DiscoveryNode realm = plugin.realm; + List replacement = new ArrayList<>(); + replacement.add( + targetNode( + "service:jmx:rmi:///jndi/rmi://127.0.0.2:9091/jmxrmi")); + replacement.add( + targetNode( + "service:jmx:rmi:///jndi/rmi://127.0.0.3:9091/jmxrmi")); + List staleChildren = new ArrayList<>(realm.children); + realm.children.clear(); + staleChildren.forEach(node -> node.parent = null); + for (DiscoveryNode node : replacement) { + node.parent = realm; + realm.children.add(node); + entityManager.persist(node); + } + entityManager.flush(); + }); + + QuarkusTransaction.requiringNew() + .run( + () -> { + DiscoveryPlugin plugin = + entityManager.find(DiscoveryPlugin.class, pluginId); + DiscoveryNode realm = plugin.realm; + assertEquals(2, realm.children.size()); + assertEquals( + List.of( + "service:jmx:rmi:///jndi/rmi://127.0.0.2:9091/jmxrmi", + "service:jmx:rmi:///jndi/rmi://127.0.0.3:9091/jmxrmi"), + realm.children.stream() + .map(n -> n.target.connectUrl.toString()) + .sorted() + .toList()); + }); + } + + @Test + void shouldAcquirePessimisticWriteLockForRealmMutation() { + UUID pluginId = QuarkusTransaction.requiringNew().call(() -> createPlugin("locked-realm")); + + QuarkusTransaction.requiringNew() + .run( + () -> { + DiscoveryPlugin plugin = + entityManager.find(DiscoveryPlugin.class, pluginId); + DiscoveryNode lockedRealm = + entityManager.find( + DiscoveryNode.class, + plugin.realm.id, + LockModeType.PESSIMISTIC_WRITE); + assertNotNull(lockedRealm); + assertEquals(plugin.realm.id, lockedRealm.id); + }); + } + + private UUID createPlugin(String realmName) { + DiscoveryNode realm = new DiscoveryNode(); + realm.name = realmName; + realm.nodeType = NodeType.BaseNodeType.REALM.getKind(); + realm.persist(); + + DiscoveryPlugin plugin = new DiscoveryPlugin(); + plugin.realm = realm; + plugin.callback = URI.create("http://storedcredentials:1@localhost:8181/callback"); + plugin.credential = null; + plugin.builtin = true; + plugin.persist(); + + return plugin.id; + } + + private DiscoveryNode targetNode(String connectUrl) { + Target target = new Target(); + target.alias = connectUrl; + target.connectUrl = URI.create(connectUrl); + target.jvmId = UUID.randomUUID().toString(); + target.labels = new java.util.HashMap<>(); + target.activeRecordings = new ArrayList<>(); + + DiscoveryNode node = new DiscoveryNode(); + node.name = connectUrl; + node.nodeType = NodeType.BaseNodeType.JVM.getKind(); + node.target = target; + node.children = new ArrayList<>(); + target.discoveryNode = node; + return node; + } +} diff --git a/src/test/java/io/cryostat/monitoring/ConnectionPoolMonitorTest.java b/src/test/java/io/cryostat/monitoring/ConnectionPoolMonitorTest.java new file mode 100644 index 000000000..0f0969584 --- /dev/null +++ b/src/test/java/io/cryostat/monitoring/ConnectionPoolMonitorTest.java @@ -0,0 +1,157 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.monitoring; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.agroal.api.AgroalDataSource; +import io.agroal.api.AgroalDataSourceMetrics; +import org.jboss.logging.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ConnectionPoolMonitorTest { + + ConnectionPoolMonitor monitor; + + @Mock AgroalDataSource dataSource; + @Mock Logger logger; + + @BeforeEach + void setup() { + monitor = new ConnectionPoolMonitor(); + monitor.dataSource = dataSource; + monitor.log = logger; + } + + @Test + void testLogPoolStatsWithNoIssues() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.activeCount()).thenReturn(5L); + when(metrics.availableCount()).thenReturn(15L); + when(metrics.awaitingCount()).thenReturn(0L); + when(metrics.maxUsedCount()).thenReturn(8L); + when(metrics.leakDetectionCount()).thenReturn(0L); + + monitor.logPoolStats(); + + verify(logger) + .debugf( + "Connection pool: active=%d, available=%d, waiting=%d, maxUsed=%d," + + " leakDetection=%d", + 5L, 15L, 0L, 8L, 0L); + } + + @Test + void testLogPoolStatsWithWaiting() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.activeCount()).thenReturn(10L); + when(metrics.availableCount()).thenReturn(0L); + when(metrics.awaitingCount()).thenReturn(3L); + when(metrics.maxUsedCount()).thenReturn(10L); + when(metrics.leakDetectionCount()).thenReturn(0L); + + monitor.logPoolStats(); + + verify(logger) + .warnf( + "Connection pool issue detected: active=%d, available=%d, waiting=%d," + + " maxUsed=%d, leakDetection=%d", + 10L, 0L, 3L, 10L, 0L); + } + + @Test + void testLogPoolStatsWithLeakDetection() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.activeCount()).thenReturn(8L); + when(metrics.availableCount()).thenReturn(12L); + when(metrics.awaitingCount()).thenReturn(0L); + when(metrics.maxUsedCount()).thenReturn(10L); + when(metrics.leakDetectionCount()).thenReturn(2L); + + monitor.logPoolStats(); + + verify(logger) + .warnf( + "Connection pool issue detected: active=%d, available=%d, waiting=%d," + + " maxUsed=%d, leakDetection=%d", + 8L, 12L, 0L, 10L, 2L); + } + + @Test + void testLogPoolStatsWithBothIssues() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.activeCount()).thenReturn(10L); + when(metrics.availableCount()).thenReturn(0L); + when(metrics.awaitingCount()).thenReturn(5L); + when(metrics.maxUsedCount()).thenReturn(10L); + when(metrics.leakDetectionCount()).thenReturn(3L); + + monitor.logPoolStats(); + + verify(logger) + .warnf( + "Connection pool issue detected: active=%d, available=%d, waiting=%d," + + " maxUsed=%d, leakDetection=%d", + 10L, 0L, 5L, 10L, 3L); + } + + @Test + void testLogPoolStatsHandlesException() { + when(dataSource.getMetrics()).thenThrow(new RuntimeException("Test exception")); + + monitor.logPoolStats(); + + verify(logger) + .errorf( + org.mockito.ArgumentMatchers.any(Exception.class), + org.mockito.ArgumentMatchers.eq( + "Failed to retrieve connection pool metrics")); + } + + @Test + void testGetActiveCount() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.activeCount()).thenReturn(7L); + + long result = monitor.getActiveCount(); + + Assertions.assertEquals(7L, result); + } + + @Test + void testGetAwaitingCount() { + AgroalDataSourceMetrics metrics = mock(AgroalDataSourceMetrics.class); + when(dataSource.getMetrics()).thenReturn(metrics); + when(metrics.awaitingCount()).thenReturn(2L); + + long result = monitor.getAwaitingCount(); + + Assertions.assertEquals(2L, result); + } +}