diff --git a/schema/notifications.yaml b/schema/notifications.yaml index ba7259c17..f62a77c6e 100644 --- a/schema/notifications.yaml +++ b/schema/notifications.yaml @@ -1026,6 +1026,8 @@ components: serviceRef: type: object properties: + id: + type: integer connectUrl: type: object description: Payload of type URI @@ -1045,6 +1047,8 @@ components: cryostat: type: object additionalProperties: true + agent: + type: boolean jvmId: type: string required: diff --git a/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java b/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java index 39bd202be..04ee872d7 100644 --- a/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java @@ -61,6 +61,7 @@ import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; +import jakarta.persistence.EntityNotFoundException; import jakarta.persistence.LockModeType; import jakarta.transaction.Transactional; import jakarta.transaction.Transactional.TxType; @@ -352,14 +353,21 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { Set observedTargets = observedTargetsWithHierarchy.keySet(); - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); + var removedTargets = Target.compare(persistedTargets).to(observedTargets).removed(); + + logger.debugv( + "Namespace {0}: Found {1} persisted targets, {2} observed targets, {3}" + + " removed targets", + namespace, + persistedTargets.size(), + observedTargets.size(), + removedTargets.size()); + + removedTargets.forEach( + (t) -> { + logger.debugv("Publishing LOST event for target: {0}", t.connectUrl); + notify(EndpointDiscoveryEvent.from(namespace, t, null, EventKind.LOST)); + }); Target.compare(persistedTargets) .to(observedTargets) @@ -708,31 +716,93 @@ DiscoveryNode buildOwnershipHierarchy(HasMetadata kubeObj, DiscoveryNode node) { } private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { - target = Target.getTargetByConnectUrl(target.connectUrl); + logger.debugv("Pruning owner chain for target: {0}", target.connectUrl); + try { + Target managedTarget = Target.getTargetByConnectUrl(target.connectUrl); - DiscoveryNode child = target.discoveryNode; - while (true) { - DiscoveryNode parent = child.parent; + if (managedTarget == null || managedTarget.id == null) { + logger.debugv("Target already deleted: {0}", target.connectUrl); + return; + } - if (parent == null) { - break; + logger.debugv( + "Found managed target with id {0}, starting pruning from discoveryNode {1}", + managedTarget.id, + managedTarget.discoveryNode != null ? managedTarget.discoveryNode.id : "null"); + + DiscoveryNode endpointNode = managedTarget.discoveryNode; + if (endpointNode == null || endpointNode.id == null) { + logger.debugv("No discoveryNode for target: {0}", target.connectUrl); + managedTarget.delete(); + return; } - parent.children.remove(child); - child.parent = null; - parent.persist(); + endpointNode = entityManager.find(DiscoveryNode.class, endpointNode.id); + logger.debugv( + "Loaded discoveryNode {0} with parent {1}", + endpointNode.id, endpointNode.parent != null ? endpointNode.parent.id : "null"); - if (parent.hasChildren() - || parent.nodeType.equals(KubeDiscoveryNodeType.NAMESPACE.getKind())) { - break; + // Get the parent BEFORE deleting the target, because deletion will cascade + DiscoveryNode child = endpointNode.parent; + + // Delete the target first - this will cascade delete its discoveryNode (the + // Endpoint/EndpointSlice) + managedTarget.delete(); + entityManager.flush(); + + // If there's no parent, we're done + if (child == null) { + logger.debugv("No parent node to prune"); + return; } - child = parent; - } + // Walk up the hierarchy, removing childless nodes from their parents + // Rely on orphan removal to delete them, don't call delete() explicitly + while (child != null) { + DiscoveryNode parent = child.parent; - entityManager.flush(); - nsNode.persist(); - target.delete(); + if (parent == null) { + logger.debugv( + "Reached orphaned node {0} (id={1}), relying on orphan removal", + child.name, child.id); + break; + } + + entityManager.refresh(parent); // Reload from DB with current children + + // Remove child from parent's collection - orphan removal will delete it + parent.children.remove(child); + child.parent = null; + + boolean hasChildren = parent.hasChildren(); + boolean isNamespace = + parent.nodeType.equals(KubeDiscoveryNodeType.NAMESPACE.getKind()); + logger.debugv( + "Parent node {0} (id={1}, type={2}): hasChildren={3}, isNamespace={4}", + parent.name, parent.id, parent.nodeType, hasChildren, isNamespace); + + if (hasChildren || isNamespace) { + logger.debugv("Stopping pruning at parent node {0}", parent.name); + break; + } + + logger.debugv("Continuing to prune parent node {0}", parent.name); + // Remove child from parent's collection - orphan removal will delete it + parent.children.remove(child); + child.parent = null; + + // Move up to check the parent too + child = parent; + } + + entityManager.flush(); + nsNode.persist(); + + } catch (EntityNotFoundException e) { + logger.debugv("Target was deleted during pruning operation: {0}", target.connectUrl, e); + entityManager.flush(); + nsNode.persist(); + } } /** @@ -778,7 +848,7 @@ private DiscoveryNode buildOwnershipHierarchy( podNode.children.add(targetNode); targetNode.parent = podNode; - DiscoveryNode rootNode = buildOwnershipHierarchy(pod.getLeft(), podNode); + DiscoveryNode rootNode = buildOwnershipHierarchyWithDbLookup(pod.getLeft(), podNode); nsNode.children.add(rootNode); rootNode.parent = nsNode; @@ -786,6 +856,62 @@ private DiscoveryNode buildOwnershipHierarchy( return rootNode; } + /** + * Builds the ownership hierarchy using database lookups to reuse existing nodes. + * + * @param kubeObj The Kubernetes object to start from + * @param node The DiscoveryNode representing the Kubernetes object + * @return The root node of the ownership chain (the topmost owner) + */ + private DiscoveryNode buildOwnershipHierarchyWithDbLookup( + HasMetadata kubeObj, DiscoveryNode node) { + Pair current = Pair.of(kubeObj, node); + + while (true) { + Pair owner = getOwnerNodeWithDbLookup(current); + if (owner == null) { + break; + } + + DiscoveryNode ownerNode = owner.getRight(); + DiscoveryNode childNode = current.getRight(); + + if (!ownerNode.children.contains(childNode)) { + ownerNode.children.add(childNode); + } + childNode.parent = ownerNode; + + current = owner; + } + + return current.getRight(); + } + + /** + * Gets the owner node using database lookup to reuse existing nodes. + * + * @param child Pair of Kubernetes object and its DiscoveryNode + * @return Pair of owner Kubernetes object and its DiscoveryNode from DB, or null if no owner + */ + private Pair getOwnerNodeWithDbLookup( + Pair child) { + HasMetadata childRef = child.getLeft(); + if (childRef == null) { + return null; + } + List owners = childRef.getMetadata().getOwnerReferences(); + if (owners.isEmpty()) { + return null; + } + String namespace = childRef.getMetadata().getNamespace(); + OwnerReference owner = + owners.stream() + .filter(o -> KubeDiscoveryNodeType.fromKubernetesKind(o.getKind()) != null) + .findFirst() + .orElse(owners.get(0)); + return queryForNode(namespace, owner.getName(), owner.getKind()); + } + private void persistNodeChain(DiscoveryNode nsNode, Target target, DiscoveryNode targetNode) { List nodeChain = collectNodeChain(targetNode, nsNode); @@ -923,7 +1049,9 @@ private Pair queryForNode( DiscoveryNode.byTypeWithName( nodeType, name, - n -> namespace.equals(n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY)), + n -> + namespace.equals(n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY)) + && isInKubernetesApiRealm(n), n -> { Map labels = new HashMap<>(); if (kubeObj != null && kubeObj.getMetadata().getLabels() != null) { @@ -936,6 +1064,25 @@ private Pair queryForNode( return Pair.of(kubeObj, node); } + /** + * Check if a node belongs to the KubernetesApi Realm by walking up its parent chain. + * + * @param node The node to check + * @return true if the node is in the KubernetesApi Realm, false otherwise + */ + private boolean isInKubernetesApiRealm(DiscoveryNode node) { + DiscoveryNode current = node; + while (current != null) { + if (current.nodeType != null + && current.nodeType.equals(NodeType.BaseNodeType.REALM.getKind()) + && REALM.equals(current.name)) { + return true; + } + current = current.parent; + } + return false; + } + @DisallowConcurrentExecution static class EndpointsResyncJob implements Job { @Inject Logger logger; diff --git a/src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java b/src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java index 26959edb5..65fcaf893 100644 --- a/src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java +++ b/src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java @@ -177,34 +177,66 @@ public boolean applies(MatchExpression matchExpression, Target target) throws Sc public List getMatchedTargets(MatchExpression matchExpression) { List targets = QuarkusTransaction.joiningExisting() - .call(() -> Target.listAll()) - .parallelStream() + .call( + () -> { + List allTargets = Target.listAll(); + // Force eager loading of lazy associations before detaching + allTargets.forEach(this::eagerLoadAssociations); + return allTargets; + }); + List matched = + targets.parallelStream() .filter( target -> { try { return applies(matchExpression, target); } catch (ScriptException e) { - logger.error( - "Error while processing expression: " - + matchExpression, - e); + logger.warnv( + e, + "Script error evaluating expression for target {0}" + + " ({1}): {2}", + target.id, + target.connectUrl, + matchExpression); + return false; + } catch (Exception e) { + logger.warnv( + e, + "Unexpected error evaluating expression for target" + + " {0} ({1})", + target.id, + target.connectUrl); return false; } }) .collect(Collectors.toList()); - var ids = new HashSet<>(); - var it = targets.iterator(); + // Deduplicate by jvmId + var ids = new HashSet(); + var it = matched.iterator(); while (it.hasNext()) { var t = it.next(); - if (ids.contains(t.jvmId)) { + if (t.jvmId != null && ids.contains(t.jvmId)) { it.remove(); continue; } - ids.add(t.jvmId); + if (t.jvmId != null) { + ids.add(t.jvmId); + } } - return targets; + return matched; + } + + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") + private void eagerLoadAssociations(Target t) { + if (t.annotations != null) { + t.annotations.cryostat(); + t.annotations.platform(); + } + if (t.labels != null) { + t.labels.size(); + } } @Name("io.cryostat.rules.MatchExpressionEvaluator.MatchExpressionApplies") @@ -315,7 +347,13 @@ private String[] getJfrEventTypeIds(SimplifiedTarget st) { .toList() .toArray(new String[0])); } catch (Exception e) { - Log.error(e); + // Log with more context about the failure to help diagnose issues + Log.warnv( + e, + "Failed to get JFR event types for target {0} ({1}). Match expression" + + " evaluation may be incomplete.", + st.jvmId, + st.connectUrl); return new String[0]; } } diff --git a/src/main/java/io/cryostat/reports/AnalysisReportAggregator.java b/src/main/java/io/cryostat/reports/AnalysisReportAggregator.java index 43ac107ae..6cfde690d 100644 --- a/src/main/java/io/cryostat/reports/AnalysisReportAggregator.java +++ b/src/main/java/io/cryostat/reports/AnalysisReportAggregator.java @@ -82,7 +82,6 @@ public class AnalysisReportAggregator { @Inject RecordingHelper recordingHelper; @ConsumeEvent(value = ActiveRecordings.ARCHIVED_RECORDING_CREATED, blocking = true) - @Transactional public void onMessage(ArchivedRecording recording) { var autoanalyze = recording.metadata().labels().get(AUTOANALYZE_LABEL); if (Boolean.parseBoolean(autoanalyze)) { @@ -91,19 +90,19 @@ public void onMessage(ArchivedRecording recording) { .subscribe() .with( entry -> { - if (recording.archivedTime() < entry.timestamp()) { - // cached data is fresher - return; - } - var filename = recording.name(); - logger.tracev( - "Triggering batch report processing for {0}/{1}.", - jvmId, filename); - var request = - new ArchivedReportRequest( - UUID.randomUUID().toString(), - Pair.of(jvmId, filename)); try { + if (recording.archivedTime() < entry.timestamp()) { + // cached data is fresher + return; + } + var filename = recording.name(); + logger.tracev( + "Triggering batch report processing for {0}/{1}.", + jvmId, filename); + var request = + new ArchivedReportRequest( + UUID.randomUUID().toString(), + Pair.of(jvmId, filename)); var future = new CompletableFuture(); bus.>request( LongRunningRequestGenerator @@ -120,7 +119,11 @@ public void onMessage(ArchivedRecording recording) { report.body()))); cache.as(CaffeineCache.class).put(jvmId, future); } catch (Exception e) { - logger.warn(e); + logger.warnv( + e, + "Failed to process archived recording {0}/{1}", + jvmId, + recording.name()); } }); } @@ -129,33 +132,38 @@ public void onMessage(ArchivedRecording recording) { @ConsumeEvent( value = LongRunningRequestGenerator.ACTIVE_REPORT_COMPLETE_ADDRESS, blocking = true) - @Transactional public void onMessage(ActiveReportCompletion evt) { var jvmId = evt.recording().target.jvmId; getOrCreateEntry(jvmId) .subscribe() .with( entry -> { - long now = Instant.now().getEpochSecond(); - if (now < entry.timestamp()) { - // cached data is fresher - return; + try { + long now = Instant.now().getEpochSecond(); + if (now < entry.timestamp()) { + // cached data is fresher + return; + } + cache.as(CaffeineCache.class) + .put( + jvmId, + CompletableFuture.completedFuture( + new Entry( + now, + entry.ownerChain(), + evt.report()))); + } catch (Exception e) { + logger.warnv( + e, + "Failed to update cache for active report for jvmId {0}", + jvmId); } - cache.as(CaffeineCache.class) - .put( - jvmId, - CompletableFuture.completedFuture( - new Entry( - now, - entry.ownerChain(), - evt.report()))); }); } @ConsumeEvent( value = LongRunningRequestGenerator.ARCHIVED_REPORT_COMPLETE_ADDRESS, blocking = true) - @Transactional public void onMessage(ArchivedReportCompletion evt) { var jvmId = evt.jvmId(); var filename = evt.filename(); @@ -163,25 +171,36 @@ public void onMessage(ArchivedReportCompletion evt) { .subscribe() .with( entry -> { - recordingHelper - .getArchivedRecordingInfo(jvmId, filename) - .ifPresent( - archivedRecording -> { - if (archivedRecording.archivedTime() - < entry.timestamp()) { - // cached data is fresher - return; - } - cache.as(CaffeineCache.class) - .put( - jvmId, - CompletableFuture.completedFuture( - new Entry( - archivedRecording - .archivedTime(), - entry.ownerChain(), - evt.report()))); - }); + try { + recordingHelper + .getArchivedRecordingInfo(jvmId, filename) + .ifPresent( + archivedRecording -> { + if (archivedRecording.archivedTime() + < entry.timestamp()) { + // cached data is fresher + return; + } + cache.as(CaffeineCache.class) + .put( + jvmId, + CompletableFuture + .completedFuture( + new Entry( + archivedRecording + .archivedTime(), + entry + .ownerChain(), + evt + .report()))); + }); + } catch (Exception e) { + logger.warnv( + e, + "Failed to update cache for archived report {0}/{1}", + jvmId, + filename); + } }); } diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index d256371c3..cd8659d98 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -38,7 +38,6 @@ import io.cryostat.targets.Target.TargetDiscovery; import io.cryostat.util.EntityExistsException; -import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.Uni; @@ -87,11 +86,26 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { @Transactional Uni onMessage(ActivationAttempt attempt) { logger.tracev( - "Attempting to activate rule \"{0}\" for target {1} -" + " attempt #{2}", + "Attempting to activate rule \"{0}\" for target {1} - attempt #{2}", attempt.ruleId(), attempt.targetId(), attempt.attempts()); try { - Target target = Target.find("id", attempt.targetId()).singleResult(); - Rule rule = Rule.find("id", attempt.ruleId()).singleResult(); + var targetOpt = Target.find("id", attempt.targetId()).firstResultOptional(); + if (targetOpt.isEmpty()) { + logger.warnv( + "Target {0} no longer exists, skipping rule activation attempt", + attempt.targetId()); + return Uni.createFrom().nullItem(); + } + Target target = targetOpt.get(); + + var ruleOpt = Rule.find("id", attempt.ruleId()).firstResultOptional(); + if (ruleOpt.isEmpty()) { + logger.warnv( + "Rule {0} no longer exists, skipping activation attempt", attempt.ruleId()); + return Uni.createFrom().nullItem(); + } + Rule rule = ruleOpt.get(); + Pair pair = recordingHelper.parseEventSpecifier(rule.eventSpecifier); Template template = @@ -125,7 +139,7 @@ Uni onMessage(ActivationAttempt attempt) { scheduleArchival(rule, target, recording); } } catch (Exception e) { - logger.warn(e); + logger.error("Rule execution failed", e); return Uni.createFrom().failure(e); } @@ -184,33 +198,7 @@ public void handleRuleModification(RuleEvent event) { @Transactional public void handleRuleRecordingCleanup(Rule rule) { cancelTasksForRule(rule); - var targets = evaluator.getMatchedTargets(rule.matchExpression); - for (var target : targets) { - QuarkusTransaction.joiningExisting() - .run( - () -> { - try { - var opt = - recordingHelper.getActiveRecording( - target, - r -> - Objects.equals( - r.name, - rule.getRecordingName())); - if (opt.isEmpty()) { - logger.warnv( - "Target {0} did not have expected Automated Rule" - + " recording with name {1}", - target.id, rule.getRecordingName()); - return; - } - var recording = opt.get(); - recordingHelper.stopRecording(recording).await().indefinitely(); - } catch (Exception e) { - logger.warn(e); - } - }); - } + logger.debugv("Cancelled scheduled tasks for rule \"{0}\"", rule.name); } private void cancelTasksForRule(Rule rule) { @@ -254,6 +242,9 @@ private RecordingOptions createRecordingOptions(Rule rule) { private void scheduleArchival(Rule rule, Target target, ActiveRecording recording) throws SchedulerException { + logger.debugv( + "Scheduling archiver job for rule {0} on target {1} recording {2}", + rule.id, target.jvmId, recording.remoteId); JobDetail jobDetail = JobBuilder.newJob(ScheduledArchiveJob.class) .withIdentity( @@ -262,9 +253,13 @@ private void scheduleArchival(Rule rule, Target target, ActiveRecording recordin .usingJobData("ruleName", rule.name) .usingJobData("recording", recording.remoteId) .usingJobData("preservedArchives", rule.preservedArchives) + .usingJobData("retryCount", 0) .build(); if (quartz.checkExists(jobDetail.getKey())) { + logger.debugv( + "Archiver job for rule {0} on target {1} recording {2} already existed", + rule.id, target.jvmId, recording.remoteId); return; } @@ -286,6 +281,9 @@ private void scheduleArchival(Rule rule, Target target, ActiveRecording recordin .build(); try { quartz.scheduleJob(jobDetail, trigger); + logger.debugv( + "Scheduled archiver job for rule {0} on target {1} recording {2}", + rule.id, target.jvmId, recording.remoteId); } catch (SchedulerException e) { logger.errorv( e, diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index fe83ebaa3..dcc21853b 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -42,11 +42,14 @@ import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.persistence.NoResultException; import jakarta.transaction.Transactional; +import jakarta.ws.rs.NotFoundException; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.projectnessie.cel.tools.ScriptException; @@ -75,7 +78,10 @@ public class RuleService { private final BlockingQueue activations = new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); + private final BlockingQueue cleanups = + new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); private final ExecutorService activator = Executors.newSingleThreadExecutor(); + private final ExecutorService cleaner = Executors.newSingleThreadExecutor(); private final ExecutorService workers = Executors.newVirtualThreadPerTaskExecutor(); void onStart(@Observes StartupEvent ev) { @@ -105,48 +111,27 @@ void onStart(@Observes StartupEvent ev) { workers.submit(() -> fireAttemptExecution(fAttempt)); } }); - } - - private void fireAttemptExecution(ActivationAttempt fAttempt) { - try { - logger.tracev( - "Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}", - fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts); - bus.request(RuleExecutor.class.getName(), fAttempt) - .await() - .atMost(connectionFailedTimeout); - logger.tracev( - "Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId); - } catch (Exception e) { - if (fAttempt != null) { - int count = fAttempt.incrementAndGet(); - int delay = (int) Math.pow(2, count); - TimeUnit unit = TimeUnit.SECONDS; - int limit = 5; - if (count < limit) { - logger.debugv( - "Rule \"{0}\" activation attempt" - + " #{1} for target {2} failed," - + " rescheduling in {3}{4} ...", - fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit); - Infrastructure.getDefaultWorkerPool() - .schedule(() -> activations.add(fAttempt), delay, unit); - } else { - logger.errorv( - "Rule \"{0}\" activation attempt" - + " #{1} failed for target {2}" - + " - limit ({3}) reached! Will" - + " not retry...", - fAttempt.ruleId, count, fAttempt.targetId, limit); - } - } - logger.error(e); - } + cleaner.submit( + () -> { + while (!cleaner.isShutdown()) { + CleanupAttempt attempt = null; + try { + attempt = cleanups.take(); + } catch (InterruptedException ie) { + logger.trace(ie); + break; + } + final CleanupAttempt fAttempt = attempt; + workers.submit(() -> fireCleanupExecution(fAttempt)); + } + }); } void onStop(@Observes ShutdownEvent evt) throws SchedulerException { activator.shutdown(); + cleaner.shutdown(); activations.clear(); + cleanups.clear(); } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) @@ -197,26 +182,156 @@ public void handleRuleModification(RuleEvent event) { public void handleRuleRecordingCleanup(Rule rule) { var targets = evaluator.getMatchedTargets(rule.matchExpression); for (var target : targets) { - recordingHelper - .getActiveRecording( - target, r -> Objects.equals(r.name, rule.getRecordingName())) - .ifPresent( - recording -> { + CleanupAttempt attempt = new CleanupAttempt(rule, target); + cleanups.add(attempt); + logger.debugv( + "Queued cleanup for recording \"{0}\" on target {1}", + rule.getRecordingName(), target.id); + } + } + + private void fireAttemptExecution(ActivationAttempt fAttempt) { + try { + logger.tracev( + "Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}", + fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts); + bus.request(RuleExecutor.class.getName(), fAttempt) + .await() + .atMost(connectionFailedTimeout); + logger.tracev( + "Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId); + } catch (Exception e) { + if (fAttempt != null) { + int count = fAttempt.attempts.incrementAndGet(); + int delay = (int) Math.pow(2, count); + TimeUnit unit = TimeUnit.SECONDS; + int limit = 5; + if (count < limit) { + logger.debugv( + "Rule \"{0}\" activation attempt" + + " #{1} for target {2} failed," + + " rescheduling in {3}{4} ...", + fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit); + Infrastructure.getDefaultWorkerPool() + .schedule(() -> activations.add(fAttempt), delay, unit); + } else { + logger.errorv( + "Rule \"{0}\" activation attempt" + + " #{1} failed for target {2}" + + " - limit ({3}) reached! Will" + + " not retry...", + fAttempt.ruleId, count, fAttempt.targetId, limit); + } + } + logger.error(e); + } + } + + private void fireCleanupExecution(CleanupAttempt fAttempt) { + try { + logger.tracev( + "Attempting to cleanup recording \"{0}\" on target {1} - attempt #{2}", + fAttempt.recordingName, fAttempt.targetId, fAttempt.attempts.get()); + + QuarkusTransaction.requiringNew() + .run( + () -> { + var targetOpt = + Target.find("id", fAttempt.targetId) + .firstResultOptional(); + if (targetOpt.isEmpty()) { + logger.infov( + "Target {0} no longer exists, cleanup complete", + fAttempt.targetId); + return; + } + Target target = targetOpt.get(); + + var recordingOpt = + recordingHelper.getActiveRecording( + target, + r -> + Objects.equals( + r.name, fAttempt.recordingName)); + if (recordingOpt.isEmpty()) { + logger.infov( + "Recording \"{0}\" no longer exists on target {1}," + + " cleanup complete", + fAttempt.recordingName, fAttempt.targetId); + return; + } + + var recording = recordingOpt.get(); try { recordingHelper.stopRecording(recording).await().indefinitely(); - } catch (Exception e) { - logger.warn(e); + logger.infov( + "Successfully stopped recording \"{0}\" on target {1}", + fAttempt.recordingName, fAttempt.targetId); + } catch (Exception stopEx) { + throw new RuntimeException(stopEx); } }); + } catch (Exception e) { + if (fAttempt != null) { + int count = fAttempt.attempts.incrementAndGet(); + int delay = (int) Math.pow(2, count); + TimeUnit unit = TimeUnit.SECONDS; + int limit = 5; + + // Check if this is a permanent failure (target/recording gone) + boolean isPermanentFailure = isPermanentCleanupFailure(e); + + if (isPermanentFailure) { + logger.infov( + "Recording \"{0}\" cleanup on target {1} failed permanently: {2}." + + " Will not retry.", + fAttempt.recordingName, fAttempt.targetId, e.getMessage()); + } else if (count < limit) { + logger.debugv( + "Recording \"{0}\" cleanup attempt #{1} on target {2} failed," + + " rescheduling in {3}{4} ...", + fAttempt.recordingName, count - 1, fAttempt.targetId, delay, unit); + Infrastructure.getDefaultWorkerPool() + .schedule(() -> cleanups.add(fAttempt), delay, unit); + } else { + logger.errorv( + "Recording \"{0}\" cleanup attempt #{1} failed on target {2}" + + " - limit ({3}) reached! Will not retry...", + fAttempt.recordingName, count, fAttempt.targetId, limit); + } + } + if (!isPermanentCleanupFailure(e)) { + logger.warn(e); + } } } + private boolean isPermanentCleanupFailure(Throwable t) { + if (t == null) { + return false; + } + // Target deleted or recording already stopped/deleted + if (t instanceof NoResultException) { + return true; + } + if (t instanceof NotFoundException) { + return true; + } + if (t instanceof HttpException httpEx) { + // 404 = recording not found, 400 = bad request (recording already stopped) + return httpEx.getStatusCode() == 404 || httpEx.getStatusCode() == 400; + } + // Check cause recursively + return isPermanentCleanupFailure(t.getCause()); + } + private void resetActivations(Rule rule) { resetActivations(a -> a.ruleId == rule.id); } private void resetActivations(Target target) { resetActivations(a -> a.targetId == target.id); + resetCleanups(c -> c.targetId == target.id); } private void resetActivations(Predicate p) { @@ -229,6 +344,16 @@ private void resetActivations(Predicate p) { } } + private void resetCleanups(Predicate p) { + Iterator it = cleanups.iterator(); + while (it.hasNext()) { + CleanupAttempt attempt = it.next(); + if (p.test(attempt)) { + it.remove(); + } + } + } + void applyRulesToTarget(Target target) { resetActivations(target); for (var rule : enabledRules()) { @@ -285,6 +410,50 @@ public ActivationAttempt(Rule rule, Target target) { } } + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof ActivationAttempt other)) return false; + return ruleId == other.ruleId && targetId == other.targetId; + } + + @Override + public int hashCode() { + return Objects.hash(ruleId, targetId); + } + } + + @SuppressFBWarnings("EI_EXPOSE_REP") + public record CleanupAttempt( + String ruleName, long targetId, String recordingName, AtomicInteger attempts) { + public CleanupAttempt(Rule rule, Target target) { + this(rule.name, target.id, rule.getRecordingName(), new AtomicInteger(0)); + } + + public CleanupAttempt { + Objects.requireNonNull(ruleName); + Objects.requireNonNull(recordingName); + Objects.requireNonNull(attempts); + if (targetId < 0) { + throw new IllegalArgumentException(); + } + if (attempts.get() < 0) { + throw new IllegalArgumentException(); + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof CleanupAttempt other)) return false; + return targetId == other.targetId && Objects.equals(recordingName, other.recordingName); + } + + @Override + public int hashCode() { + return Objects.hash(targetId, recordingName); + } + public int incrementAndGet() { return this.attempts.incrementAndGet(); } diff --git a/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java b/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java index 8732465a8..6e13a1872 100644 --- a/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java +++ b/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java @@ -15,6 +15,7 @@ */ package io.cryostat.rules; +import java.util.Date; import java.util.List; import java.util.Objects; @@ -35,6 +36,10 @@ import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.SimpleTrigger; +import org.quartz.TriggerBuilder; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; @@ -47,6 +52,7 @@ * @see io.cryostat.rules.Rule * @see io.cryostat.rules.RuleExecutor */ +@PersistJobDataAfterExecution class ScheduledArchiveJob implements Job { @Inject RecordingHelper recordingHelper; @@ -60,6 +66,7 @@ public void execute(JobExecutionContext ctx) throws JobExecutionException { String jvmId = (String) ctx.getMergedJobDataMap().get("jvmId"); String ruleName = (String) ctx.getMergedJobDataMap().get("ruleName"); int preservedArchives = (int) ctx.getMergedJobDataMap().get("preservedArchives"); + int retryCount = ctx.getMergedJobDataMap().getIntValue("retryCount"); try { List previousRecordings = previousRecordings(jvmId, ruleName); @@ -85,47 +92,112 @@ public void execute(JobExecutionContext ctx) throws JobExecutionException { () -> { long recordingId = (long) ctx.getMergedJobDataMap().get("recording"); + Target target = + Target.getTargetByJvmId(jvmId) + .orElseThrow( + () -> + new NoResultException( + String.format( + "Target %s not" + + " found", + jvmId))); ActiveRecording recording = recordingHelper - .getActiveRecording( - Target.getTargetByJvmId(jvmId) - .orElseThrow(), - recordingId) + .getActiveRecording(target, recordingId) .orElseThrow( - () -> { - JobExecutionException ex = - new JobExecutionException( - String.format( - """ - Target %s did not have recording with remote ID %d - """ - .strip(), - jvmId, - recordingId)); - ex.setUnscheduleFiringTrigger(true); - return ex; - }); + () -> + new NoResultException( + String.format( + "Target %s did not" + + " have" + + " recording" + + " with remote" + + " ID %d", + jvmId, + recordingId))); return recordingHelper.archiveRecording(recording); }); + + if (retryCount > 0) { + ctx.getJobDetail().getJobDataMap().put("retryCount", 0); + logger.debugv( + "Archive job for rule {0} target {1} succeeded after {2} retries", + ruleName, jvmId, retryCount); + } } catch (NoResultException | ObjectDeletedException e) { - // target disappeared in the meantime. No big deal. - logger.debug(e); + // Target or recording disappeared - unschedule immediately + logger.warnv( + e, + "Target or recording no longer exists for rule {0} target {1}, unscheduling" + + " job", + ruleName, + jvmId); JobExecutionException ex = new JobExecutionException(e); ex.setRefireImmediately(false); ex.setUnscheduleFiringTrigger(true); throw ex; } catch (S3Exception e) { + handleRetryableFailure(ctx, e, jvmId, ruleName, retryCount); + } catch (Exception e) { + handleRetryableFailure(ctx, e, jvmId, ruleName, retryCount); + } + } + + private void handleRetryableFailure( + JobExecutionContext ctx, Exception e, String jvmId, String ruleName, int retryCount) + throws JobExecutionException { + int maxRetries = 6; + + if (retryCount >= maxRetries) { + logger.errorv( + e, + "Archive job for rule {0} target {1} failed after {2} retries, unscheduling", + ruleName, + jvmId, + retryCount); JobExecutionException ex = new JobExecutionException(e); - ex.setRefireImmediately(true); + ex.setRefireImmediately(false); + ex.setUnscheduleFiringTrigger(true); throw ex; - } catch (Exception e) { - if (e instanceof JobExecutionException) { - throw e; - } - var jee = new JobExecutionException(e); - jee.setRefireImmediately(false); - throw jee; } + + int backoffSeconds = (int) Math.pow(2, retryCount); + ctx.getJobDetail().getJobDataMap().put("retryCount", retryCount + 1); + + logger.warnv( + e, + "Archive job for rule {0} target {1} failed (attempt {2}/{3}), retrying in {4}s", + ruleName, + jvmId, + retryCount + 1, + maxRetries, + backoffSeconds); + + try { + int intervalSeconds = + (int) (((SimpleTrigger) ctx.getTrigger()).getRepeatInterval() / 1000); + ctx.getScheduler() + .rescheduleJob( + ctx.getTrigger().getKey(), + TriggerBuilder.newTrigger() + .withIdentity(ctx.getTrigger().getKey()) + .startAt( + new Date( + System.currentTimeMillis() + + backoffSeconds * 1000L)) + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(intervalSeconds) + .repeatForever() + .withMisfireHandlingInstructionNextWithRemainingCount()) + .build()); + } catch (org.quartz.SchedulerException se) { + logger.errorv(se, "Failed to reschedule job with backoff"); + } + + JobExecutionException ex = new JobExecutionException(e); + ex.setRefireImmediately(false); + throw ex; } List previousRecordings(String jvmId, String ruleName) { diff --git a/src/main/java/io/cryostat/security/UserInfoResolver.java b/src/main/java/io/cryostat/security/UserInfoResolver.java index 450734249..214d1eb5b 100644 --- a/src/main/java/io/cryostat/security/UserInfoResolver.java +++ b/src/main/java/io/cryostat/security/UserInfoResolver.java @@ -203,24 +203,27 @@ static String getRemoteAddressAsUsername() { } static String getRemoteAddress(RoutingContext routingContext) { - String forwardedFor = routingContext.request().getHeader("X-Forwarded-For"); - if (StringUtils.isNotBlank(forwardedFor)) { - // X-Forwarded-For can contain multiple IPs, take the first one (original client) - int commaIndex = forwardedFor.indexOf(','); - return commaIndex > 0 - ? forwardedFor.substring(0, commaIndex).trim() - : forwardedFor.trim(); - } + try { + String forwardedFor = routingContext.request().getHeader("X-Forwarded-For"); + if (StringUtils.isNotBlank(forwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one (original client) + int commaIndex = forwardedFor.indexOf(','); + return commaIndex > 0 + ? forwardedFor.substring(0, commaIndex).trim() + : forwardedFor.trim(); + } - String realIp = routingContext.request().getHeader("X-Real-IP"); - if (StringUtils.isNotBlank(realIp)) { - return realIp.trim(); - } + String realIp = routingContext.request().getHeader("X-Real-IP"); + if (StringUtils.isNotBlank(realIp)) { + return realIp.trim(); + } - if (routingContext.request().remoteAddress() != null) { - return routingContext.request().remoteAddress().host(); + if (routingContext.request().remoteAddress() != null) { + return routingContext.request().remoteAddress().host(); + } + } catch (Exception e) { + logger.debugf(e, "Could not retrieve remote address via RoutingContext"); } - return null; } }